You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2024/01/15 12:41:41 UTC
(pulsar) branch master updated: [fix] [broker] Fix break change: could not subscribe partitioned topic with a suffix-matched regexp due to a mistake of PIP-145 (#21885)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4ebbd2f5244 [fix] [broker] Fix break change: could not subscribe partitioned topic with a suffix-matched regexp due to a mistake of PIP-145 (#21885)
4ebbd2f5244 is described below
commit 4ebbd2f5244ea2f8c0fd75e4dcb52055568b7fc7
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Mon Jan 15 20:41:35 2024 +0800
[fix] [broker] Fix break change: could not subscribe partitioned topic with a suffix-matched regexp due to a mistake of PIP-145 (#21885)
---
.../broker/auth/MockedPulsarServiceBaseTest.java | 14 +++
.../client/impl/PatternTopicsConsumerImplTest.java | 107 +++++++++++++++++++++
.../org/apache/pulsar/common/topics/TopicList.java | 21 +++-
3 files changed, 139 insertions(+), 3 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index eb75963061e..0e9c09d0802 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -22,6 +22,7 @@ import static org.apache.pulsar.broker.BrokerTestUtil.spyWithoutRecordingInvocat
import static org.testng.Assert.assertEquals;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
@@ -55,7 +56,9 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -717,5 +720,16 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
}
}
+ public static void reconnectAllConnections(PulsarClientImpl c) throws Exception {
+ ConnectionPool pool = c.getCnxPool();
+ Method closeAllConnections = ConnectionPool.class.getDeclaredMethod("closeAllConnections", new Class[]{});
+ closeAllConnections.setAccessible(true);
+ closeAllConnections.invoke(pool, new Object[]{});
+ }
+
+ public void reconnectAllConnections() throws Exception {
+ reconnectAllConnections((PulsarClientImpl) pulsarClient);
+ }
+
private static final Logger log = LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index c708b4cae0a..9bcbdfed4c9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -27,6 +27,7 @@ import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
+import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -35,6 +36,7 @@ import java.util.regex.Pattern;
import java.util.stream.IntStream;
import io.netty.util.Timeout;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder;
@@ -679,6 +681,111 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
}
}
+ @DataProvider(name= "partitioned")
+ public Object[][] partitioned(){
+ return new Object[][]{
+ {true},
+ {false}
+ };
+ }
+
+ @Test(timeOut = testTimeout, dataProvider = "partitioned")
+ public void testPreciseRegexpSubscribe(boolean partitioned) throws Exception {
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ final String subscriptionName = "s1";
+ final Pattern pattern = Pattern.compile(String.format("%s$", topicName));
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topicsPattern(pattern)
+ // Disable automatic discovery.
+ .patternAutoDiscoveryPeriod(1000)
+ .subscriptionName(subscriptionName)
+ .subscriptionType(SubscriptionType.Shared)
+ .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+ .receiverQueueSize(4)
+ .subscribe();
+
+ // 1. create topic.
+ if (partitioned) {
+ admin.topics().createPartitionedTopic(topicName, 1);
+ } else {
+ admin.topics().createNonPartitionedTopic(topicName);
+ }
+
+ // 2. verify consumer can subscribe the topic.
+ assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 1);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 1);
+ if (partitioned) {
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 1);
+ } else {
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 0);
+ }
+ });
+
+ // cleanup.
+ consumer.close();
+ if (partitioned) {
+ admin.topics().deletePartitionedTopic(topicName);
+ } else {
+ admin.topics().delete(topicName);
+ }
+ }
+
+ @Test(timeOut = 240 * 1000, dataProvider = "partitioned")
+ public void testPreciseRegexpSubscribeDisabledTopicWatcher(boolean partitioned) throws Exception {
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ final String subscriptionName = "s1";
+ final Pattern pattern = Pattern.compile(String.format("%s$", topicName));
+
+ // Close all ServerCnx by close client-side sockets to make the config changes effect.
+ pulsar.getConfig().setEnableBrokerSideSubscriptionPatternEvaluation(false);
+ reconnectAllConnections();
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topicsPattern(pattern)
+ // Disable brokerSideSubscriptionPatternEvaluation will leading disable topic list watcher.
+ // So set patternAutoDiscoveryPeriod to a little value.
+ .patternAutoDiscoveryPeriod(1)
+ .subscriptionName(subscriptionName)
+ .subscriptionType(SubscriptionType.Shared)
+ .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+ .receiverQueueSize(4)
+ .subscribe();
+
+ // 1. create topic.
+ if (partitioned) {
+ admin.topics().createPartitionedTopic(topicName, 1);
+ } else {
+ admin.topics().createNonPartitionedTopic(topicName);
+ }
+
+ // 2. verify consumer can subscribe the topic.
+ // Since the minimum value of `patternAutoDiscoveryPeriod` is 60s, we set the test timeout to a triple value.
+ assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
+ Awaitility.await().atMost(Duration.ofMinutes(3)).untilAsserted(() -> {
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 1);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 1);
+ if (partitioned) {
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 1);
+ } else {
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 0);
+ }
+ });
+
+ // cleanup.
+ consumer.close();
+ if (partitioned) {
+ admin.topics().deletePartitionedTopic(topicName);
+ } else {
+ admin.topics().delete(topicName);
+ }
+ // Close all ServerCnx by close client-side sockets to make the config changes effect.
+ pulsar.getConfig().setEnableBrokerSideSubscriptionPatternEvaluation(true);
+ reconnectAllConnections();
+ }
+
private PulsarClient createDelayWatchTopicsClient() throws Exception {
ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString());
return InjectedClientCnxClientBuilder.create(clientBuilder,
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
index 250cea217ee..4c0a8d500b7 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
@@ -47,13 +47,16 @@ public class TopicList {
}
public static List<String> filterTopics(List<String> original, Pattern topicsPattern) {
- final Pattern shortenedTopicsPattern = topicsPattern.toString().contains(SCHEME_SEPARATOR)
- ? Pattern.compile(SCHEME_SEPARATOR_PATTERN.split(topicsPattern.toString())[1]) : topicsPattern;
+ final Pattern shortenedTopicsPattern = Pattern.compile(removeTopicDomainScheme(topicsPattern.toString()));
return original.stream()
.map(TopicName::get)
+ .filter(topicName -> {
+ String partitionedTopicName = topicName.getPartitionedTopicName();
+ String removedScheme = SCHEME_SEPARATOR_PATTERN.split(partitionedTopicName)[1];
+ return shortenedTopicsPattern.matcher(removedScheme).matches();
+ })
.map(TopicName::toString)
- .filter(topic -> shortenedTopicsPattern.matcher(SCHEME_SEPARATOR_PATTERN.split(topic)[1]).matches())
.collect(Collectors.toList());
}
@@ -78,4 +81,16 @@ public class TopicList {
s1.removeAll(list2);
return s1;
}
+
+ private static String removeTopicDomainScheme(String originalRegexp) {
+ if (!originalRegexp.toString().contains(SCHEME_SEPARATOR)) {
+ return originalRegexp;
+ }
+ String removedTopicDomain = SCHEME_SEPARATOR_PATTERN.split(originalRegexp.toString())[1];
+ if (originalRegexp.contains("^")) {
+ return String.format("^%s", removedTopicDomain);
+ } else {
+ return removedTopicDomain;
+ }
+ }
}