You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/25 05:49:02 UTC
[pulsar] 12/14: [fix][txn]: fix cannot enable transaction when is allow auto update schema disabled (#14809)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e397e1e4c4b36701b8b11b7164c4481a16f359fb
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Thu Mar 24 15:49:01 2022 +0800
[fix][txn]: fix cannot enable transaction when is allow auto update schema disabled (#14809)
(cherry picked from commit b5b0967f12174ba35baaf25092ac521f281e6b7d)
---
.../pulsar/broker/service/AbstractTopic.java | 3 +++
.../pulsar/broker/transaction/TransactionTest.java | 31 ++++++++++++++++++++++
2 files changed, 34 insertions(+)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index cc29202..caa0ea2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -356,6 +356,9 @@ public abstract class AbstractTopic implements Topic {
}
private boolean allowAutoUpdateSchema() {
+ if (brokerService.isSystemTopic(topic)) {
+ return true;
+ }
if (isAllowAutoUpdateSchema == null) {
return brokerService.pulsar().getConfig().isAllowAutoUpdateSchemaEnabled();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 308fa35..6fd1e92 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -64,6 +64,7 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
@@ -89,12 +90,14 @@ import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
+import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.events.EventsTopicNames;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
+import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
@@ -850,4 +853,32 @@ public class TransactionTest extends TransactionTestBase {
Assert.assertEquals(changeMaxReadPositionAndAddAbortTimes.get(), 0L);
}
+
+ @Test
+ public void testAutoCreateSchemaForTransactionSnapshot() throws Exception {
+ String namespace = TENANT + "/ns2";
+ String topic = namespace + "/test";
+ pulsarServiceList.forEach((pulsarService ->
+ pulsarService.getConfiguration().setAllowAutoUpdateSchemaEnabled(false)));
+ admin.namespaces().createNamespace(namespace);
+ admin.topics().createNonPartitionedTopic(topic);
+ TopicName transactionBufferTopicName =
+ NamespaceEventsSystemTopicFactory.getSystemTopicName(
+ TopicName.get(topic).getNamespaceObject(), EventType.TRANSACTION_BUFFER_SNAPSHOT);
+ TopicName transactionBufferTopicName1 =
+ NamespaceEventsSystemTopicFactory.getSystemTopicName(
+ TopicName.get(topic).getNamespaceObject(), EventType.TOPIC_POLICY);
+ Awaitility.await().untilAsserted(() -> {
+ SchemaInfo schemaInfo = admin
+ .schemas()
+ .getSchemaInfo(transactionBufferTopicName.toString());
+ Assert.assertNotNull(schemaInfo);
+ SchemaInfo schemaInfo1 = admin
+ .schemas()
+ .getSchemaInfo(transactionBufferTopicName1.toString());
+ Assert.assertNotNull(schemaInfo1);
+ });
+ pulsarServiceList.forEach((pulsarService ->
+ pulsarService.getConfiguration().setAllowAutoUpdateSchemaEnabled(true)));
+ }
}
\ No newline at end of file