You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/07/13 07:38:04 UTC
[pulsar] branch branch-2.9 updated: cherry-pick [fix][txn] fix pattern sub filter transaction system topic (#16533)
This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 54b52887197 cherry-pick [fix][txn] fix pattern sub filter transaction system topic (#16533)
54b52887197 is described below
commit 54b52887197c9115a022c5fd74065edf9c44f0a8
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Tue Jul 12 14:36:08 2022 +0800
cherry-pick [fix][txn] fix pattern sub filter transaction system topic (#16533)
---
.../apache/pulsar/broker/service/ServerCnx.java | 3 ++
.../pendingack/PendingAckPersistentTest.java | 55 ++++++++++++++++++++++
2 files changed, 58 insertions(+)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 0bbdaca9f74..02aa9e93c57 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1882,6 +1882,9 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
if (isAuthorized) {
getBrokerService().pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
.thenAccept(topics -> {
+ topics = topics.stream()
+ .filter(topic -> !PulsarService.isTransactionSystemTopic(TopicName.get(topic)))
+ .collect(Collectors.toList());
if (log.isDebugEnabled()) {
log.debug(
"[{}] Received CommandGetTopicsOfNamespace for namespace [//{}] by {}, size:{}",
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index bd22ff423a9..b85f4efd133 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -645,4 +645,59 @@ public class PendingAckPersistentTest extends TransactionTestBase {
}
}
+ @Test
+ public void testGetSubPatternTopicFilterTxnInternalTopic() throws Exception {
+ String topic = TopicName.get(TopicDomain.persistent.toString(),
+ NamespaceName.get(NAMESPACE1), "testGetSubPatternTopicFilterTxnInternalTopic").toString();
+
+ int partition = 3;
+ admin.topics().createPartitionedTopic(topic, partition);
+
+ String subscriptionName = "sub";
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .enableBatching(false)
+ .topic(topic).create();
+
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .subscriptionName(subscriptionName)
+ .subscriptionType(SubscriptionType.Shared)
+ .topic(topic)
+ .subscribe();
+
+ for (int i = 0; i < partition; i++) {
+ producer.send("test");
+ }
+
+ // creat pending ack managedLedger
+ for (int i = 0; i < partition; i++) {
+ Transaction transaction = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build()
+ .get();
+ consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction);
+ transaction.commit().get();
+ }
+
+ consumer.close();
+
+ @Cleanup
+ Consumer<String> patternConsumer = pulsarClient.newConsumer(Schema.STRING)
+ .subscriptionName("patternSub")
+ .subscriptionType(SubscriptionType.Shared)
+ .topicsPattern("persistent://" + NAMESPACE1 + "/.*")
+ .subscribe();
+
+ for (int i = 0; i < partition; i++) {
+ producer.send("test" + i);
+ }
+
+ // can use pattern sub consume
+ for (int i = 0; i < partition; i++) {
+ patternConsumer.acknowledgeAsync(patternConsumer.receive().getMessageId());
+ }
+ patternConsumer.close();
+ producer.close();
+ }
}