You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/21 00:56:52 UTC

[pulsar] 06/15: [Issue 12757][broker] add broker config isAllowAutoUpdateSchema (#12786)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 012e1b1a5541d0ca17c65efc1f4bb905ad76ea7b
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Thu Nov 18 20:37:37 2021 +0800

    [Issue 12757][broker] add broker config isAllowAutoUpdateSchema (#12786)
    
    (cherry picked from commit fa7be236efcc6772e0aac05f25f8d5f3cf0ad741)
---
 conf/broker.conf                                   |  4 ++
 conf/standalone.conf                               |  4 ++
 .../apache/pulsar/broker/ServiceConfiguration.java |  8 +++
 .../pulsar/broker/service/AbstractTopic.java       | 38 ++++++----
 .../SchemaCompatibilityCheckTest.java              | 81 ++++++++++++++++++++++
 .../pulsar/common/policies/data/Policies.java      |  2 +-
 6 files changed, 121 insertions(+), 16 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 3dd8590..2d7df90 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -270,6 +270,10 @@ brokerMaxConnections=0
 # The maximum number of connections per IP. If it exceeds, new connections are rejected.
 brokerMaxConnectionsPerIp=0
 
+# Allow schema to be auto updated at broker level. User can override this by
+# 'is_allow_auto_update_schema' of namespace policy.
+isAllowAutoUpdateSchemaEnabled=true
+
 # Enable check for minimum allowed client library version
 clientLibraryVersionCheckEnabled=false
 
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 878a852..577a6ff 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -176,6 +176,10 @@ defaultNumberOfNamespaceBundles=4
 # Using a value of 0, is disabling maxTopicsPerNamespace-limit check.
 maxTopicsPerNamespace=0
 
+# Allow schema to be auto updated at broker level. User can override this by
+# 'is_allow_auto_update_schema' of namespace policy.
+isAllowAutoUpdateSchemaEnabled=true
+
 # Enable check for minimum allowed client library version
 clientLibraryVersionCheckEnabled=false
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 69a3d35..0d578f4 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -573,6 +573,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
     private int brokerMaxConnectionsPerIp = 0;
 
     @FieldContext(
+        category = CATEGORY_POLICIES,
+        dynamic = true,
+        doc = "Allow schema to be auto updated at broker level. User can override this by 'is_allow_auto_update_schema'"
+            + " of namespace policy. This is enabled by default."
+    )
+    private boolean isAllowAutoUpdateSchemaEnabled = true;
+
+    @FieldContext(
         category = CATEGORY_SERVER,
         dynamic = true,
         doc = "Enable check for minimum allowed client library version"
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 9463be9..30c0fae 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -95,7 +95,7 @@ public abstract class AbstractTopic implements Topic {
     protected volatile boolean isEncryptionRequired = false;
     protected volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy =
             SchemaCompatibilityStrategy.FULL;
-    protected volatile boolean isAllowAutoUpdateSchema = true;
+    protected volatile Boolean isAllowAutoUpdateSchema;
     // schema validation enforced flag
     protected volatile boolean schemaValidationEnforced = false;
 
@@ -328,20 +328,28 @@ public abstract class AbstractTopic implements Topic {
         String base = TopicName.get(getName()).getPartitionedTopicName();
         String id = TopicName.get(base).getSchemaName();
         SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService();
-        return isAllowAutoUpdateSchema ? schemaRegistryService
-                .putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy)
-                : schemaRegistryService.trimDeletedSchemaAndGetList(id).thenCompose(schemaAndMetadataList ->
-                schemaRegistryService.getSchemaVersionBySchemaData(schemaAndMetadataList, schema)
-                        .thenCompose(schemaVersion -> {
-                    if (schemaVersion == null) {
-                        return FutureUtil
-                                .failedFuture(
-                                        new IncompatibleSchemaException(
-                                                "Schema not found and schema auto updating is disabled."));
-                    } else {
-                        return CompletableFuture.completedFuture(schemaVersion);
-                    }
-                }));
+
+        if (allowAutoUpdateSchema()) {
+            return schemaRegistryService.putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy);
+        } else {
+            return schemaRegistryService.trimDeletedSchemaAndGetList(id).thenCompose(schemaAndMetadataList ->
+                    schemaRegistryService.getSchemaVersionBySchemaData(schemaAndMetadataList, schema)
+                            .thenCompose(schemaVersion -> {
+                                if (schemaVersion == null) {
+                                    return FutureUtil.failedFuture(new IncompatibleSchemaException(
+                                            "Schema not found and schema auto updating is disabled."));
+                                } else {
+                                    return CompletableFuture.completedFuture(schemaVersion);
+                                }
+                            }));
+        }
+    }
+
+    private boolean allowAutoUpdateSchema() {
+        if (isAllowAutoUpdateSchema == null) {
+            return brokerService.pulsar().getConfig().isAllowAutoUpdateSchemaEnabled();
+        }
+        return isAllowAutoUpdateSchema;
     }
 
     @Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
index 293f71d..80168b9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
@@ -218,6 +218,87 @@ public class SchemaCompatibilityCheckTest extends MockedPulsarServiceBaseTest {
         }
     }
 
+    @Test(dataProvider = "AllCheckSchemaCompatibilityStrategy")
+    public void testBrokerAllowAutoUpdateSchemaDisabled(SchemaCompatibilityStrategy schemaCompatibilityStrategy)
+            throws Exception {
+
+        final String tenant = PUBLIC_TENANT;
+        final String topic = "test-consumer-compatibility";
+        String namespace = "test-namespace-" + randomName(16);
+        String fqtn = TopicName.get(
+                TopicDomain.persistent.value(),
+                tenant,
+                namespace,
+                topic
+        ).toString();
+
+        NamespaceName namespaceName = NamespaceName.get(tenant, namespace);
+
+        admin.namespaces().createNamespace(
+                tenant + "/" + namespace,
+                Sets.newHashSet(CLUSTER_NAME)
+        );
+
+        assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()),
+                SchemaCompatibilityStrategy.FULL);
+
+        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), schemaCompatibilityStrategy);
+        admin.schemas().createSchema(fqtn, Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());
+
+
+        pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(false);
+        ProducerBuilder<Schemas.PersonTwo> producerThreeBuilder = pulsarClient
+                .newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
+                                (false).withSupportSchemaVersioning(true).
+                        withPojo(Schemas.PersonTwo.class).build()))
+                .topic(fqtn);
+        try {
+            producerThreeBuilder.create();
+        } catch (Exception e) {
+            Assert.assertTrue(e.getMessage().contains("Schema not found and schema auto updating is disabled."));
+        }
+
+        pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(true);
+        ConsumerBuilder<Schemas.PersonTwo> comsumerBuilder = pulsarClient.newConsumer(Schema.AVRO(
+                        SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
+                                        (false).withSupportSchemaVersioning(true).
+                                withPojo(Schemas.PersonTwo.class).build()))
+                .subscriptionName("test")
+                .topic(fqtn);
+
+        Producer<Schemas.PersonTwo> producer = producerThreeBuilder.create();
+        Consumer<Schemas.PersonTwo> consumerTwo = comsumerBuilder.subscribe();
+
+        producer.send(new Schemas.PersonTwo(2, "Lucy"));
+        Message<Schemas.PersonTwo> message = consumerTwo.receive();
+
+        Schemas.PersonTwo personTwo = message.getValue();
+        consumerTwo.acknowledge(message);
+
+        assertEquals(personTwo.getId(), 2);
+        assertEquals(personTwo.getName(), "Lucy");
+
+        producer.close();
+        consumerTwo.close();
+
+        pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(false);
+
+        producer = producerThreeBuilder.create();
+        consumerTwo = comsumerBuilder.subscribe();
+
+        producer.send(new Schemas.PersonTwo(2, "Lucy"));
+        message = consumerTwo.receive();
+
+        personTwo = message.getValue();
+        consumerTwo.acknowledge(message);
+
+        assertEquals(personTwo.getId(), 2);
+        assertEquals(personTwo.getName(), "Lucy");
+
+        consumerTwo.close();
+        producer.close();
+    }
+
     @Test(dataProvider =  "AllCheckSchemaCompatibilityStrategy")
     public void testIsAutoUpdateSchema(SchemaCompatibilityStrategy schemaCompatibilityStrategy) throws Exception {
         final String tenant = PUBLIC_TENANT;
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index 631675f..d26c61e 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -108,7 +108,7 @@ public class Policies {
     public SchemaCompatibilityStrategy schema_compatibility_strategy = SchemaCompatibilityStrategy.UNDEFINED;
 
     @SuppressWarnings("checkstyle:MemberName")
-    public boolean is_allow_auto_update_schema = true;
+    public Boolean is_allow_auto_update_schema = null;
 
     @SuppressWarnings("checkstyle:MemberName")
     public boolean schema_validation_enforced = false;