You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/10/12 01:44:26 UTC
[pulsar] branch branch-2.9 updated: [fix][broker] Fix system topic schema not compatible bug (#17986)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 717dd1d4cbc [fix][broker] Fix system topic schema not compatible bug (#17986)
717dd1d4cbc is described below
commit 717dd1d4cbcb81442de7914cdd86abef8c849813
Author: Shen Liu <li...@126.com>
AuthorDate: Wed Oct 12 09:44:19 2022 +0800
[fix][broker] Fix system topic schema not compatible bug (#17986)
---
.../org/apache/pulsar/broker/service/AbstractTopic.java | 4 ++++
.../systopic/NamespaceEventsSystemTopicServiceTest.java | 13 +++++++++++++
2 files changed, 17 insertions(+)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 1307001a720..7c8d3b24223 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -158,6 +158,10 @@ public abstract class AbstractTopic implements Topic {
this.preciseTopicPublishRateLimitingEnable =
brokerService.pulsar().getConfiguration().isPreciseTopicPublishRateLimiterEnable();
updatePublishDispatcher(Optional.empty());
+ if (isSystemTopic()) {
+ schemaCompatibilityStrategy =
+ brokerService.pulsar().getConfig().getSystemTopicSchemaCompatibilityStrategy();
+ }
}
protected boolean isProducersExceeded() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
index d81c89d859f..12f9ea26029 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
@@ -18,11 +18,14 @@
*/
package org.apache.pulsar.broker.systopic;
+import static org.mockito.Mockito.mock;
import com.google.common.collect.Sets;
import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.persistent.SystemTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
@@ -90,6 +93,16 @@ public class NamespaceEventsSystemTopicServiceTest extends MockedPulsarServiceBa
Assert.assertEquals(SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE, topic.getSchemaCompatibilityStrategy());
}
+ @Test
+ public void testSystemTopicSchemaCompatibility() throws Exception {
+ TopicPoliciesSystemTopicClient systemTopicClientForNamespace1 = systemTopicFactory
+ .createTopicPoliciesSystemTopicClient(NamespaceName.get(NAMESPACE1));
+ String topicName = systemTopicClientForNamespace1.getTopicName().toString();
+ SystemTopic topic = new SystemTopic(topicName, mock(ManagedLedger.class), pulsar.getBrokerService());
+
+ Assert.assertEquals(SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE, topic.getSchemaCompatibilityStrategy());
+ }
+
@Test
public void testSendAndReceiveNamespaceEvents() throws Exception {
TopicPoliciesSystemTopicClient systemTopicClientForNamespace1 = systemTopicFactory