You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2021/11/04 11:19:01 UTC
[pulsar] 13/14: Allow to configure schema compatibility policy for
system topics (#12598)
This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c431158362697bd0f07fb668dc872568d85321f1
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Nov 3 22:10:39 2021 -0700
Allow to configure schema compatibility policy for system topics (#12598)
(cherry picked from commit 7aea58d293ba2ca29e0acbf4cfd5733d84846120)
---
conf/broker.conf | 3 +++
.../apache/pulsar/broker/ServiceConfiguration.java | 7 ++++++
.../pulsar/broker/service/AbstractTopic.java | 8 ++++++-
.../NamespaceEventsSystemTopicServiceTest.java | 25 ++++++++++++++++++++++
4 files changed, 42 insertions(+), 1 deletion(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 0144cdc..552bdb4 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -520,6 +520,9 @@ zookeeperSessionExpiredPolicy=shutdown
# Enable or disable system topic
systemTopicEnabled=false
+# The schema compatibility strategy to use for system topics
+systemTopicSchemaCompatibilityStrategy=ALWAYS_COMPATIBLE
+
# Enable or disable topic level policies, topic level policies depends on the system topic
# Please enable the system topic first.
topicLevelPoliciesEnabled=false
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 49e0e19..5eb0a6b 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1011,6 +1011,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
private boolean systemTopicEnabled = false;
@FieldContext(
+ category = CATEGORY_SCHEMA,
+ doc = "The schema compatibility strategy to use for system topics"
+ )
+ private SchemaCompatibilityStrategy systemTopicSchemaCompatibilityStrategy =
+ SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE;
+
+ @FieldContext(
category = CATEGORY_SERVER,
doc = "Enable or disable topic level policies, topic level policies depends on the system topic, " +
"please enable the system topic first.")
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index cb66734..615ec08 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -47,6 +47,7 @@ import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
@@ -92,6 +93,8 @@ public abstract class AbstractTopic implements Topic {
// Whether messages published must be encrypted or not in this topic
protected volatile boolean isEncryptionRequired = false;
+
+ @Getter
protected volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy =
SchemaCompatibilityStrategy.FULL;
protected volatile boolean isAllowAutoUpdateSchema = true;
@@ -525,7 +528,10 @@ public abstract class AbstractTopic implements Topic {
}
protected void setSchemaCompatibilityStrategy(Policies policies) {
- if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) {
+ if (SystemTopicClient.isSystemTopic(TopicName.get(this.topic))) {
+ schemaCompatibilityStrategy =
+ brokerService.pulsar().getConfig().getSystemTopicSchemaCompatibilityStrategy();
+ } else if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) {
schemaCompatibilityStrategy = brokerService.pulsar()
.getConfig().getSchemaCompatibilityStrategy();
if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
index b524e1a..2daca67 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
@@ -19,9 +19,14 @@
package org.apache.pulsar.broker.systopic;
import com.google.common.collect.Sets;
+import lombok.Cleanup;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.events.ActionType;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.events.EventsTopicNames;
@@ -31,6 +36,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.slf4j.Logger;
@@ -65,6 +71,25 @@ public class NamespaceEventsSystemTopicServiceTest extends MockedPulsarServiceBa
}
@Test
+ public void testSchemaCompatibility() throws Exception {
+ TopicPoliciesSystemTopicClient systemTopicClientForNamespace1 = systemTopicFactory
+ .createTopicPoliciesSystemTopicClient(NamespaceName.get(NAMESPACE1));
+ String topicName = systemTopicClientForNamespace1.getTopicName().toString();
+ @Cleanup
+ Reader<byte[]> reader = pulsarClient.newReader(Schema.BYTES)
+ .topic(topicName)
+ .startMessageId(MessageId.earliest)
+ .create();
+
+ PersistentTopic topic =
+ (PersistentTopic) pulsar.getBrokerService()
+ .getTopic(topicName, false)
+ .join().get();
+
+ Assert.assertEquals(SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE, topic.getSchemaCompatibilityStrategy());
+ }
+
+ @Test
public void testSendAndReceiveNamespaceEvents() throws Exception {
TopicPoliciesSystemTopicClient systemTopicClientForNamespace1 = systemTopicFactory
.createTopicPoliciesSystemTopicClient(NamespaceName.get(NAMESPACE1));