You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/25 05:49:02 UTC

[pulsar] 12/14: [fix][txn]: fix cannot enable transaction when is allow auto update schema disabled (#14809)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e397e1e4c4b36701b8b11b7164c4481a16f359fb
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Thu Mar 24 15:49:01 2022 +0800

    [fix][txn]: fix cannot enable transaction when is allow auto update schema disabled (#14809)
    
    (cherry picked from commit b5b0967f12174ba35baaf25092ac521f281e6b7d)
---
 .../pulsar/broker/service/AbstractTopic.java       |  3 +++
 .../pulsar/broker/transaction/TransactionTest.java | 31 ++++++++++++++++++++++
 2 files changed, 34 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index cc29202..caa0ea2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -356,6 +356,9 @@ public abstract class AbstractTopic implements Topic {
     }
 
     private boolean allowAutoUpdateSchema() {
+        if (brokerService.isSystemTopic(topic)) {
+            return true;
+        }
         if (isAllowAutoUpdateSchema == null) {
             return brokerService.pulsar().getConfig().isAllowAutoUpdateSchemaEnabled();
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 308fa35..6fd1e92 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -64,6 +64,7 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
@@ -89,12 +90,14 @@ import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
+import org.apache.pulsar.common.events.EventType;
 import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
+import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
@@ -850,4 +853,32 @@ public class TransactionTest extends TransactionTestBase {
         Assert.assertEquals(changeMaxReadPositionAndAddAbortTimes.get(), 0L);
 
     }
+
+    @Test
+    public void testAutoCreateSchemaForTransactionSnapshot() throws Exception {
+        String namespace = TENANT + "/ns2";
+        String topic = namespace + "/test";
+        pulsarServiceList.forEach((pulsarService ->
+                pulsarService.getConfiguration().setAllowAutoUpdateSchemaEnabled(false)));
+        admin.namespaces().createNamespace(namespace);
+        admin.topics().createNonPartitionedTopic(topic);
+        TopicName transactionBufferTopicName =
+                NamespaceEventsSystemTopicFactory.getSystemTopicName(
+                        TopicName.get(topic).getNamespaceObject(), EventType.TRANSACTION_BUFFER_SNAPSHOT);
+        TopicName transactionBufferTopicName1 =
+                NamespaceEventsSystemTopicFactory.getSystemTopicName(
+                        TopicName.get(topic).getNamespaceObject(), EventType.TOPIC_POLICY);
+        Awaitility.await().untilAsserted(() -> {
+            SchemaInfo schemaInfo = admin
+                    .schemas()
+                    .getSchemaInfo(transactionBufferTopicName.toString());
+            Assert.assertNotNull(schemaInfo);
+            SchemaInfo schemaInfo1 = admin
+                    .schemas()
+                    .getSchemaInfo(transactionBufferTopicName1.toString());
+            Assert.assertNotNull(schemaInfo1);
+        });
+        pulsarServiceList.forEach((pulsarService ->
+                pulsarService.getConfiguration().setAllowAutoUpdateSchemaEnabled(true)));
+    }
 }
\ No newline at end of file