You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/12/15 12:37:32 UTC

[pulsar] branch branch-2.9 updated: [Transaction] Fix transaction system topic create in loop. (#12749)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 393f95c  [Transaction] Fix transaction system topic create in loop. (#12749)
393f95c is described below

commit 393f95cd419bd579f24fc2b5f9cd0cb8f085092c
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Sat Nov 13 09:15:30 2021 +0800

    [Transaction] Fix transaction system topic create in loop. (#12749)
    
    fix https://github.com/apache/pulsar/issues/12727
    ### Motivation
    Now transaction system topic can be created.
    
    ### Modifications
    we should not allow broker or user create by transaction system format topic.
    
    1. checkout topic auto create.
    2. admin create topic.
    
    ### Verifying this change
    
    add some test for it
    
    (cherry picked from commit 2c4d913c4b3fb1c6d924efaa0a24c93a2d2de7d0)
---
 .../pulsar/broker/transaction/TransactionTest.java | 57 ++++++++++++++++++++++
 1 file changed, 57 insertions(+)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 34ea362..a851c14 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -180,6 +180,63 @@ public class TransactionTest extends TransactionTestBase {
     }
 
     @Test
+    public void testCreateTransactionSystemTopic() throws Exception {
+        String subName = "test";
+        String topicName = TopicName.get(NAMESPACE1 + "/" + "testCreateTransactionSystemTopic").toString();
+
+        try {
+            // init pending ack
+            @Cleanup
+            Consumer<byte[]> consumer = getConsumer(topicName, subName);
+            Transaction transaction = pulsarClient.newTransaction()
+                    .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
+
+            consumer.acknowledgeAsync(new MessageIdImpl(10, 10, 10), transaction).get();
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof PulsarClientException.TransactionConflictException);
+        }
+        topicName = MLPendingAckStore.getTransactionPendingAckStoreSuffix(topicName, subName);
+
+        // getList does not include transaction system topic
+        List<String> list = admin.topics().getList(NAMESPACE1);
+        assertEquals(list.size(), 4);
+        list.forEach(topic -> assertFalse(topic.contains(PENDING_ACK_STORE_SUFFIX)));
+
+        try {
+            // can't create transaction system topic
+            @Cleanup
+            Consumer<byte[]> consumer = getConsumer(topicName, subName);
+            fail();
+        } catch (PulsarClientException.NotAllowedException e) {
+            assertTrue(e.getMessage().contains("Can not create transaction system topic"));
+        }
+
+        // can't create transaction system topic
+        try {
+            admin.topics().getSubscriptions(topicName);
+            fail();
+        } catch (PulsarAdminException.ConflictException e) {
+            assertEquals(e.getMessage(), "Can not create transaction system topic " + topicName);
+        }
+
+        // can't create transaction system topic
+        try {
+            admin.topics().createPartitionedTopic(topicName, 3);
+            fail();
+        } catch (PulsarAdminException.ConflictException e) {
+            assertEquals(e.getMessage(), "Cannot create topic in system topic format!");
+        }
+
+        // can't create transaction system topic
+        try {
+            admin.topics().createNonPartitionedTopic(topicName);
+            fail();
+        } catch (PulsarAdminException.ConflictException e) {
+            assertEquals(e.getMessage(), "Cannot create topic in system topic format!");
+        }
+    }
+
+    @Test
     public void brokerNotInitTxnManagedLedgerTopic() throws Exception {
         String subName = "test";