You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2024/01/15 12:41:41 UTC

(pulsar) branch master updated: [fix] [broker] Fix break change: could not subscribe partitioned topic with a suffix-matched regexp due to a mistake of PIP-145 (#21885)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ebbd2f5244  [fix] [broker] Fix break change: could not subscribe partitioned topic with a suffix-matched regexp due to a mistake of PIP-145 (#21885)
4ebbd2f5244 is described below

commit 4ebbd2f5244ea2f8c0fd75e4dcb52055568b7fc7
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Mon Jan 15 20:41:35 2024 +0800

     [fix] [broker] Fix break change: could not subscribe partitioned topic with a suffix-matched regexp due to a mistake of PIP-145 (#21885)
---
 .../broker/auth/MockedPulsarServiceBaseTest.java   |  14 +++
 .../client/impl/PatternTopicsConsumerImplTest.java | 107 +++++++++++++++++++++
 .../org/apache/pulsar/common/topics/TopicList.java |  21 +++-
 3 files changed, 139 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index eb75963061e..0e9c09d0802 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -22,6 +22,7 @@ import static org.apache.pulsar.broker.BrokerTestUtil.spyWithoutRecordingInvocat
 import static org.testng.Assert.assertEquals;
 import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URL;
@@ -55,7 +56,9 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -717,5 +720,16 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
         }
     }
 
+    public static void reconnectAllConnections(PulsarClientImpl c) throws Exception {
+        ConnectionPool pool = c.getCnxPool();
+        Method closeAllConnections = ConnectionPool.class.getDeclaredMethod("closeAllConnections", new Class[]{});
+        closeAllConnections.setAccessible(true);
+        closeAllConnections.invoke(pool, new Object[]{});
+    }
+
+    public void reconnectAllConnections() throws Exception {
+        reconnectAllConnections((PulsarClientImpl) pulsarClient);
+    }
+
     private static final Logger log = LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class);
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index c708b4cae0a..9bcbdfed4c9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -27,6 +27,7 @@ import static org.testng.Assert.fail;
 
 import com.google.common.collect.Lists;
 
+import java.time.Duration;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -35,6 +36,7 @@ import java.util.regex.Pattern;
 import java.util.stream.IntStream;
 
 import io.netty.util.Timeout;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder;
@@ -679,6 +681,111 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
         }
     }
 
+    @DataProvider(name= "partitioned")
+    public Object[][] partitioned(){
+        return new Object[][]{
+                {true},
+                {false}
+        };
+    }
+
+    @Test(timeOut = testTimeout, dataProvider = "partitioned")
+    public void testPreciseRegexpSubscribe(boolean partitioned) throws Exception {
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        final String subscriptionName = "s1";
+        final Pattern pattern = Pattern.compile(String.format("%s$", topicName));
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topicsPattern(pattern)
+                // Disable automatic discovery.
+                .patternAutoDiscoveryPeriod(1000)
+                .subscriptionName(subscriptionName)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+                .receiverQueueSize(4)
+                .subscribe();
+
+        // 1. create topic.
+        if (partitioned) {
+            admin.topics().createPartitionedTopic(topicName, 1);
+        } else {
+            admin.topics().createNonPartitionedTopic(topicName);
+        }
+
+        // 2. verify consumer can subscribe the topic.
+        assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 1);
+            assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 1);
+            if (partitioned) {
+                assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 1);
+            } else {
+                assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 0);
+            }
+        });
+
+        // cleanup.
+        consumer.close();
+        if (partitioned) {
+            admin.topics().deletePartitionedTopic(topicName);
+        } else {
+            admin.topics().delete(topicName);
+        }
+    }
+
+    @Test(timeOut = 240 * 1000, dataProvider = "partitioned")
+    public void testPreciseRegexpSubscribeDisabledTopicWatcher(boolean partitioned) throws Exception {
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        final String subscriptionName = "s1";
+        final Pattern pattern = Pattern.compile(String.format("%s$", topicName));
+
+        // Close all ServerCnx by close client-side sockets to make the config changes effect.
+        pulsar.getConfig().setEnableBrokerSideSubscriptionPatternEvaluation(false);
+        reconnectAllConnections();
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topicsPattern(pattern)
+                // Disable brokerSideSubscriptionPatternEvaluation will leading disable topic list watcher.
+                // So set patternAutoDiscoveryPeriod to a little value.
+                .patternAutoDiscoveryPeriod(1)
+                .subscriptionName(subscriptionName)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+                .receiverQueueSize(4)
+                .subscribe();
+
+        // 1. create topic.
+        if (partitioned) {
+            admin.topics().createPartitionedTopic(topicName, 1);
+        } else {
+            admin.topics().createNonPartitionedTopic(topicName);
+        }
+
+        // 2. verify consumer can subscribe the topic.
+        // Since the minimum value of `patternAutoDiscoveryPeriod` is 60s, we set the test timeout to a triple value.
+        assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
+        Awaitility.await().atMost(Duration.ofMinutes(3)).untilAsserted(() -> {
+            assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 1);
+            assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 1);
+            if (partitioned) {
+                assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 1);
+            } else {
+                assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 0);
+            }
+        });
+
+        // cleanup.
+        consumer.close();
+        if (partitioned) {
+            admin.topics().deletePartitionedTopic(topicName);
+        } else {
+            admin.topics().delete(topicName);
+        }
+        // Close all ServerCnx by close client-side sockets to make the config changes effect.
+        pulsar.getConfig().setEnableBrokerSideSubscriptionPatternEvaluation(true);
+        reconnectAllConnections();
+    }
+
     private PulsarClient createDelayWatchTopicsClient() throws Exception {
         ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString());
         return InjectedClientCnxClientBuilder.create(clientBuilder,
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
index 250cea217ee..4c0a8d500b7 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
@@ -47,13 +47,16 @@ public class TopicList {
     }
     public static List<String> filterTopics(List<String> original, Pattern topicsPattern) {
 
-        final Pattern shortenedTopicsPattern = topicsPattern.toString().contains(SCHEME_SEPARATOR)
-                ? Pattern.compile(SCHEME_SEPARATOR_PATTERN.split(topicsPattern.toString())[1]) : topicsPattern;
+        final Pattern shortenedTopicsPattern = Pattern.compile(removeTopicDomainScheme(topicsPattern.toString()));
 
         return original.stream()
                 .map(TopicName::get)
+                .filter(topicName -> {
+                    String partitionedTopicName = topicName.getPartitionedTopicName();
+                    String removedScheme = SCHEME_SEPARATOR_PATTERN.split(partitionedTopicName)[1];
+                    return shortenedTopicsPattern.matcher(removedScheme).matches();
+                })
                 .map(TopicName::toString)
-                .filter(topic -> shortenedTopicsPattern.matcher(SCHEME_SEPARATOR_PATTERN.split(topic)[1]).matches())
                 .collect(Collectors.toList());
     }
 
@@ -78,4 +81,16 @@ public class TopicList {
         s1.removeAll(list2);
         return s1;
     }
+
+    private static String removeTopicDomainScheme(String originalRegexp) {
+        if (!originalRegexp.toString().contains(SCHEME_SEPARATOR)) {
+            return originalRegexp;
+        }
+        String removedTopicDomain = SCHEME_SEPARATOR_PATTERN.split(originalRegexp.toString())[1];
+        if (originalRegexp.contains("^")) {
+            return String.format("^%s", removedTopicDomain);
+        } else {
+            return removedTopicDomain;
+        }
+    }
 }