You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/10/12 01:44:26 UTC

[pulsar] branch branch-2.9 updated: [fix][broker] Fix system topic schema not compatible bug (#17986)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 717dd1d4cbc [fix][broker] Fix system topic schema not compatible bug (#17986)
717dd1d4cbc is described below

commit 717dd1d4cbcb81442de7914cdd86abef8c849813
Author: Shen Liu <li...@126.com>
AuthorDate: Wed Oct 12 09:44:19 2022 +0800

    [fix][broker] Fix system topic schema not compatible bug (#17986)
---
 .../org/apache/pulsar/broker/service/AbstractTopic.java     |  4 ++++
 .../systopic/NamespaceEventsSystemTopicServiceTest.java     | 13 +++++++++++++
 2 files changed, 17 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 1307001a720..7c8d3b24223 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -158,6 +158,10 @@ public abstract class AbstractTopic implements Topic {
         this.preciseTopicPublishRateLimitingEnable =
                 brokerService.pulsar().getConfiguration().isPreciseTopicPublishRateLimiterEnable();
         updatePublishDispatcher(Optional.empty());
+        if (isSystemTopic()) {
+            schemaCompatibilityStrategy =
+                    brokerService.pulsar().getConfig().getSystemTopicSchemaCompatibilityStrategy();
+        }
     }
 
     protected boolean isProducersExceeded() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
index d81c89d859f..12f9ea26029 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
@@ -18,11 +18,14 @@
  */
 package org.apache.pulsar.broker.systopic;
 
+import static org.mockito.Mockito.mock;
 import com.google.common.collect.Sets;
 import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.persistent.SystemTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -90,6 +93,16 @@ public class NamespaceEventsSystemTopicServiceTest extends MockedPulsarServiceBa
         Assert.assertEquals(SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE, topic.getSchemaCompatibilityStrategy());
     }
 
+    @Test
+    public void testSystemTopicSchemaCompatibility() throws Exception {
+        TopicPoliciesSystemTopicClient systemTopicClientForNamespace1 = systemTopicFactory
+                .createTopicPoliciesSystemTopicClient(NamespaceName.get(NAMESPACE1));
+        String topicName = systemTopicClientForNamespace1.getTopicName().toString();
+        SystemTopic topic = new SystemTopic(topicName, mock(ManagedLedger.class), pulsar.getBrokerService());
+
+        Assert.assertEquals(SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE, topic.getSchemaCompatibilityStrategy());
+    }
+
     @Test
     public void testSendAndReceiveNamespaceEvents() throws Exception {
         TopicPoliciesSystemTopicClient systemTopicClientForNamespace1 = systemTopicFactory