You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/07 06:41:22 UTC
[pulsar] branch branch-2.7 updated: Allow to configure schema compatibility policy for system topics (#12598)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new de57f98 Allow to configure schema compatibility policy for system topics (#12598)
de57f98 is described below
commit de57f98692c64ae5a1c6d5f79f4fbb922f946cb5
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Nov 3 22:10:39 2021 -0700
Allow to configure schema compatibility policy for system topics (#12598)
(cherry picked from commit 7aea58d293ba2ca29e0acbf4cfd5733d84846120)
---
conf/broker.conf | 3 +++
.../apache/pulsar/broker/ServiceConfiguration.java | 8 +++++++
.../pulsar/broker/service/AbstractTopic.java | 11 ++++++++--
.../NamespaceEventsSystemTopicFactory.java | 7 ++++++
.../NamespaceEventsSystemTopicServiceTest.java | 25 ++++++++++++++++++++++
5 files changed, 52 insertions(+), 2 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 0cd4e1e..9534ed7 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -455,6 +455,9 @@ zookeeperSessionExpiredPolicy=shutdown
# Enable or disable system topic
systemTopicEnabled=false
+# The schema compatibility strategy to use for system topics
+systemTopicSchemaCompatibilityStrategy=ALWAYS_COMPATIBLE
+
# Enable or disable topic level policies, topic level policies depends on the system topic
# Please enable the system topic first.
topicLevelPoliciesEnabled=false
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 38adecf..70f9cbe 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.sasl.SaslConstants;
@@ -860,6 +861,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
private boolean systemTopicEnabled = false;
@FieldContext(
+ category = CATEGORY_SCHEMA,
+ doc = "The schema compatibility strategy to use for system topics"
+ )
+ private SchemaCompatibilityStrategy systemTopicSchemaCompatibilityStrategy =
+ SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE;
+
+ @FieldContext(
category = CATEGORY_SERVER,
doc = "Enable or disable topic level policies, topic level policies depends on the system topic, " +
"please enable the system topic first.")
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 339e89d..d568ac2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import lombok.Getter;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
@@ -39,6 +40,7 @@ import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
@@ -84,6 +86,8 @@ public abstract class AbstractTopic implements Topic {
// Whether messages published must be encrypted or not in this topic
protected volatile boolean isEncryptionRequired = false;
+
+ @Getter
protected volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy =
SchemaCompatibilityStrategy.FULL;
protected volatile boolean isAllowAutoUpdateSchema = true;
@@ -313,8 +317,11 @@ public abstract class AbstractTopic implements Topic {
PUBLISH_LATENCY.observe(latency, unit);
}
- protected void setSchemaCompatibilityStrategy (Policies policies) {
- if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) {
+ protected void setSchemaCompatibilityStrategy(Policies policies) {
+ if (SystemTopicClient.isSystemTopic(TopicName.get(this.topic))) {
+ schemaCompatibilityStrategy =
+ brokerService.pulsar().getConfig().getSystemTopicSchemaCompatibilityStrategy();
+ } else if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) {
schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
policies.schema_auto_update_compatibility_strategy);
} else {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
index 911a997..266c30e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
@@ -34,6 +34,13 @@ public class NamespaceEventsSystemTopicFactory {
this.client = client;
}
+ public TopicPoliciesSystemTopicClient createTopicPoliciesSystemTopicClient(NamespaceName namespaceName) {
+ TopicName topicName = TopicName.get("persistent", namespaceName,
+ EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+ log.info("Create topic policies system topic client {}", topicName.toString());
+ return new TopicPoliciesSystemTopicClient(client, topicName);
+ }
+
public SystemTopicClient createSystemTopic(NamespaceName namespaceName, EventType eventType) {
TopicName topicName = getSystemTopicName(namespaceName, eventType);
if (topicName != null) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
index 1363e1d..1995987 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
@@ -19,9 +19,14 @@
package org.apache.pulsar.broker.systopic;
import com.google.common.collect.Sets;
+import lombok.Cleanup;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.events.ActionType;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.events.EventsTopicNames;
@@ -31,6 +36,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,6 +69,25 @@ public class NamespaceEventsSystemTopicServiceTest extends MockedPulsarServiceBa
}
@Test
+ public void testSchemaCompatibility() throws Exception {
+ TopicPoliciesSystemTopicClient systemTopicClientForNamespace1 = systemTopicFactory
+ .createTopicPoliciesSystemTopicClient(NamespaceName.get(NAMESPACE1));
+ String topicName = systemTopicClientForNamespace1.getTopicName().toString();
+ @Cleanup
+ Reader<byte[]> reader = pulsarClient.newReader(Schema.BYTES)
+ .topic(topicName)
+ .startMessageId(MessageId.earliest)
+ .create();
+
+ PersistentTopic topic =
+ (PersistentTopic) pulsar.getBrokerService()
+ .getTopic(topicName, false)
+ .join().get();
+
+ Assert.assertEquals(SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE, topic.getSchemaCompatibilityStrategy());
+ }
+
+ @Test
public void testSendAndReceiveNamespaceEvents() throws Exception {
SystemTopicClient systemTopicClientForNamespace1 = systemTopicFactory.createSystemTopic(NamespaceName.get(NAMESPACE1), EventType.TOPIC_POLICY);
TopicPolicies policies = TopicPolicies.builder()