You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2021/11/04 11:19:01 UTC

[pulsar] 13/14: Allow to configure schema compatibility policy for system topics (#12598)

This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c431158362697bd0f07fb668dc872568d85321f1
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Nov 3 22:10:39 2021 -0700

    Allow to configure schema compatibility policy for system topics (#12598)
    
    (cherry picked from commit 7aea58d293ba2ca29e0acbf4cfd5733d84846120)
---
 conf/broker.conf                                   |  3 +++
 .../apache/pulsar/broker/ServiceConfiguration.java |  7 ++++++
 .../pulsar/broker/service/AbstractTopic.java       |  8 ++++++-
 .../NamespaceEventsSystemTopicServiceTest.java     | 25 ++++++++++++++++++++++
 4 files changed, 42 insertions(+), 1 deletion(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 0144cdc..552bdb4 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -520,6 +520,9 @@ zookeeperSessionExpiredPolicy=shutdown
 # Enable or disable system topic
 systemTopicEnabled=false
 
+# The schema compatibility strategy to use for system topics
+systemTopicSchemaCompatibilityStrategy=ALWAYS_COMPATIBLE
+
 # Enable or disable topic level policies, topic level policies depends on the system topic
 # Please enable the system topic first.
 topicLevelPoliciesEnabled=false
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 49e0e19..5eb0a6b 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1011,6 +1011,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
     private boolean systemTopicEnabled = false;
 
     @FieldContext(
+            category = CATEGORY_SCHEMA,
+            doc = "The schema compatibility strategy to use for system topics"
+    )
+    private SchemaCompatibilityStrategy systemTopicSchemaCompatibilityStrategy =
+            SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE;
+
+    @FieldContext(
         category = CATEGORY_SERVER,
         doc = "Enable or disable topic level policies, topic level policies depends on the system topic, " +
                 "please enable the system topic first.")
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index cb66734..615ec08 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -47,6 +47,7 @@ import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
 import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
@@ -92,6 +93,8 @@ public abstract class AbstractTopic implements Topic {
 
     // Whether messages published must be encrypted or not in this topic
     protected volatile boolean isEncryptionRequired = false;
+
+    @Getter
     protected volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy =
             SchemaCompatibilityStrategy.FULL;
     protected volatile boolean isAllowAutoUpdateSchema = true;
@@ -525,7 +528,10 @@ public abstract class AbstractTopic implements Topic {
     }
 
     protected void setSchemaCompatibilityStrategy(Policies policies) {
-        if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) {
+        if (SystemTopicClient.isSystemTopic(TopicName.get(this.topic))) {
+            schemaCompatibilityStrategy =
+                    brokerService.pulsar().getConfig().getSystemTopicSchemaCompatibilityStrategy();
+        } else if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) {
             schemaCompatibilityStrategy = brokerService.pulsar()
                     .getConfig().getSchemaCompatibilityStrategy();
             if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
index b524e1a..2daca67 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
@@ -19,9 +19,14 @@
 package org.apache.pulsar.broker.systopic;
 
 import com.google.common.collect.Sets;
+import lombok.Cleanup;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.events.ActionType;
 import org.apache.pulsar.common.events.EventType;
 import org.apache.pulsar.common.events.EventsTopicNames;
@@ -31,6 +36,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.ClusterDataImpl;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.slf4j.Logger;
@@ -65,6 +71,25 @@ public class NamespaceEventsSystemTopicServiceTest extends MockedPulsarServiceBa
     }
 
     @Test
+    public void testSchemaCompatibility() throws Exception {
+        TopicPoliciesSystemTopicClient systemTopicClientForNamespace1 = systemTopicFactory
+                .createTopicPoliciesSystemTopicClient(NamespaceName.get(NAMESPACE1));
+        String topicName = systemTopicClientForNamespace1.getTopicName().toString();
+        @Cleanup
+        Reader<byte[]> reader = pulsarClient.newReader(Schema.BYTES)
+                .topic(topicName)
+                .startMessageId(MessageId.earliest)
+                .create();
+
+        PersistentTopic topic =
+                (PersistentTopic) pulsar.getBrokerService()
+                        .getTopic(topicName, false)
+                        .join().get();
+
+        Assert.assertEquals(SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE, topic.getSchemaCompatibilityStrategy());
+    }
+
+    @Test
     public void testSendAndReceiveNamespaceEvents() throws Exception {
         TopicPoliciesSystemTopicClient systemTopicClientForNamespace1 = systemTopicFactory
                 .createTopicPoliciesSystemTopicClient(NamespaceName.get(NAMESPACE1));