You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/07/13 07:38:04 UTC

[pulsar] branch branch-2.9 updated: cherry-pick [fix][txn] fix pattern sub filter transaction system topic (#16533)

This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 54b52887197 cherry-pick [fix][txn] fix pattern sub filter transaction system topic (#16533)
54b52887197 is described below

commit 54b52887197c9115a022c5fd74065edf9c44f0a8
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Tue Jul 12 14:36:08 2022 +0800

    cherry-pick [fix][txn] fix pattern sub filter transaction system topic (#16533)
---
 .../apache/pulsar/broker/service/ServerCnx.java    |  3 ++
 .../pendingack/PendingAckPersistentTest.java       | 55 ++++++++++++++++++++++
 2 files changed, 58 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 0bbdaca9f74..02aa9e93c57 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1882,6 +1882,9 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                 if (isAuthorized) {
                     getBrokerService().pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
                         .thenAccept(topics -> {
+                            topics = topics.stream()
+                                    .filter(topic -> !PulsarService.isTransactionSystemTopic(TopicName.get(topic)))
+                                    .collect(Collectors.toList());
                             if (log.isDebugEnabled()) {
                                 log.debug(
                                         "[{}] Received CommandGetTopicsOfNamespace for namespace [//{}] by {}, size:{}",
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index bd22ff423a9..b85f4efd133 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -645,4 +645,59 @@ public class PendingAckPersistentTest extends TransactionTestBase {
         }
     }
 
+    @Test
+    public void testGetSubPatternTopicFilterTxnInternalTopic() throws Exception {
+        String topic = TopicName.get(TopicDomain.persistent.toString(),
+                NamespaceName.get(NAMESPACE1), "testGetSubPatternTopicFilterTxnInternalTopic").toString();
+
+        int partition = 3;
+        admin.topics().createPartitionedTopic(topic, partition);
+
+        String subscriptionName = "sub";
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .enableBatching(false)
+                .topic(topic).create();
+
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .subscriptionName(subscriptionName)
+                .subscriptionType(SubscriptionType.Shared)
+                .topic(topic)
+                .subscribe();
+
+        for (int i = 0; i < partition; i++) {
+            producer.send("test");
+        }
+
+        // creat pending ack managedLedger
+        for (int i = 0; i < partition; i++) {
+            Transaction transaction = pulsarClient.newTransaction()
+                    .withTransactionTimeout(5, TimeUnit.SECONDS)
+                    .build()
+                    .get();
+            consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction);
+            transaction.commit().get();
+        }
+
+        consumer.close();
+
+        @Cleanup
+        Consumer<String> patternConsumer = pulsarClient.newConsumer(Schema.STRING)
+                .subscriptionName("patternSub")
+                .subscriptionType(SubscriptionType.Shared)
+                .topicsPattern("persistent://" + NAMESPACE1 + "/.*")
+                .subscribe();
+
+        for (int i = 0; i < partition; i++) {
+            producer.send("test" + i);
+        }
+
+        // can use pattern sub consume
+        for (int i = 0; i < partition; i++) {
+            patternConsumer.acknowledgeAsync(patternConsumer.receive().getMessageId());
+        }
+        patternConsumer.close();
+        producer.close();
+    }
 }