You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/07/21 11:46:52 UTC

[GitHub] [pulsar] shibd opened a new pull request, #16719: [fix][client] Fix topic name subscribe error.

shibd opened a new pull request, #16719:
URL: https://github.com/apache/pulsar/pull/16719

   #16374  #16556 #16375 #16599
   
   ### Motivation
   
   This is a bug, after [[PIP-145](https://github.com/apache/pulsar/pull/16062)],  `PatternMultiTopicsConsumer` can receive  `CommandWatchTopicUpdate` from broker to subscribe new topic.
   
   But this topic name is with the partition index(case: public/default/test-topic-partition-0), This will produce unexpected behavior when executing the following subscribe method.
   
   https://github.com/apache/pulsar/blob/9c93ab45af80dbeb116bfe9b63ff579ac4e22ce6/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L935-L942
   
   
   The reasons for these failed unit tests are:
   
   When the client receives the `CommandWatchTopicUpdate` command, it will enter the collection of processing non-partitions, then only process the [consumer](https://github.com/apache/pulsar/blob/9c93ab45af80dbeb116bfe9b63ff579ac4e22ce6/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L79) collection and not the [partitionedTopics](https://github.com/apache/pulsar/blob/9c93ab45af80dbeb116bfe9b63ff579ac4e22ce6/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L82) collection
   
   https://github.com/apache/pulsar/blob/9c93ab45af80dbeb116bfe9b63ff579ac4e22ce6/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L1068-L1094
   
   Then, when we execute the `consumer1.run(consumer1.getRecheckPatternTimeout())` to trigger the update subscription on the unit test, By executing the following logic, will cause `topic-4` to be removed and added at the same time. ultimately destabilizes unit tests (maybe deletes will be executed after additions)
   
   https://github.com/apache/pulsar/blob/9c93ab45af80dbeb116bfe9b63ff579ac4e22ce6/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java#L115-L121
   
   Print log here:
   <img width="1332" alt="image" src="https://user-images.githubusercontent.com/33416836/180206241-db6daeea-727e-422f-9861-74f9f16b1525.png">
   
   Can See:
   
   ```
   2022-07-21T19:43:35,883 - INFO  - [pulsar-client-io-397-1:PatternMultiTopicsConsumerImpl@154] - debug add: [persistent://my-property/my-ns/pattern-topic-4-AutoSubscribePatternConsumer]
   2022-07-21T19:43:35,883 - INFO  - [pulsar-client-io-397-1:PatternMultiTopicsConsumerImpl@155] - debug remove: [persistent://my-property/my-ns/pattern-topic-4-AutoSubscribePatternConsumer-partition-0, persistent://my-property/my-ns/pattern-topic-4-AutoSubscribePatternConsumer-partition-1, persistent://my-property/my-ns/pattern-topic-4-AutoSubscribePatternConsumer-partition-2, persistent://my-property/my-ns/pattern-topic-4-AutoSubscribePatternConsumer-partition-3]
   
   ```
   
   ### Modifications
   
   - Change subscribe logic, use the `getPartitionedTopicName()`.
   
   
   ### Documentation
   
   - [x] `doc-not-needed` 
   (Please explain why)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #16719: [fix][client] Fix subscription topic name error.

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #16719:
URL: https://github.com/apache/pulsar/pull/16719#discussion_r927508732


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java:
##########
@@ -203,9 +205,14 @@ public CompletableFuture<Void> onTopicsAdded(Collection<String> addedTopics) {
                 return addFuture;
             }
 
+            Set<String> addTopicPartitionedName = addedTopics.stream()

Review Comment:
   Do you mean removeTopic? there is no problem with the implementation of removeTopic. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #16719: [fix][client] Fix subscription topic name error.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #16719:
URL: https://github.com/apache/pulsar/pull/16719#discussion_r927596079


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java:
##########
@@ -203,9 +205,14 @@ public CompletableFuture<Void> onTopicsAdded(Collection<String> addedTopics) {
                 return addFuture;
             }
 
+            Set<String> addTopicPartitionedName = addedTopics.stream()
+                    .map(addTopicName -> TopicName.get(addTopicName).getPartitionedTopicName())
+                    .collect(Collectors.toSet());
+
             List<CompletableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(partitionedTopics.size());
-            addedTopics.stream().forEach(topic -> futures.add(
-                    subscribeAsync(topic, false /* createTopicIfDoesNotExist */)));
+            addTopicPartitionedName.stream().forEach(partitionedTopic -> futures.add(

Review Comment:
   ```suggestion
               addTopicPartitionedName.forEach(partitionedTopic -> futures.add(
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #16719: [fix][client] Fix subscription topic name error.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #16719:
URL: https://github.com/apache/pulsar/pull/16719#discussion_r927412412


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java:
##########
@@ -514,6 +509,38 @@ public void testStartEmptyPatternConsumer() throws Exception {
         producer3.close();
     }
 
+    @Test(timeOut = testTimeout)
+    public void testAutoSubscribePatterConsumerFromBrokerWatcher() throws Exception {
+        String key = "AutoSubscribePatternConsumer";
+        String subscriptionName = "my-ex-subscription-" + key;
+
+        Pattern pattern = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*");
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()

Review Comment:
   Please close the consumer after the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #16719: [fix][client] Fix subscription topic name error.

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #16719:
URL: https://github.com/apache/pulsar/pull/16719#discussion_r927438083


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java:
##########
@@ -514,6 +509,38 @@ public void testStartEmptyPatternConsumer() throws Exception {
         producer3.close();
     }
 
+    @Test(timeOut = testTimeout)
+    public void testAutoSubscribePatterConsumerFromBrokerWatcher() throws Exception {
+        String key = "AutoSubscribePatternConsumer";
+        String subscriptionName = "my-ex-subscription-" + key;
+
+        Pattern pattern = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*");
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #16719: [fix][client] Fix subscription topic name error.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #16719:
URL: https://github.com/apache/pulsar/pull/16719#issuecomment-1192303664

   > The new changes are: Inside the PatternTopicsChangedListener.onTopicsAdded method let it use partitioned name to subscribe.
   
   Make sense.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #16719: [fix][client] Fix subscription topic name error.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #16719:
URL: https://github.com/apache/pulsar/pull/16719#discussion_r927597281


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java:
##########
@@ -469,11 +469,6 @@ public void testStartEmptyPatternConsumer() throws Exception {
             .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
 
-        List<String> topicNames = Lists.newArrayList(topicName1, topicName2, topicName3);
-        NamespaceService nss = pulsar.getNamespaceService();
-        doReturn(CompletableFuture.completedFuture(topicNames)).when(nss)
-                .getListOfPersistentTopics(NamespaceName.get("my-property/my-ns"));

Review Comment:
   Why did you remove these three lines?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on pull request #16719: [fix][client] Fix subscription topic name error.

Posted by GitBox <gi...@apache.org>.
shibd commented on PR #16719:
URL: https://github.com/apache/pulsar/pull/16719#issuecomment-1192298802

   > Could you add a unit test to protect this change?
   
   @BewareMyPower Thanks for your reminder, I looked again, and It is found that we cannot be modify the logic in the `MultiTopicsConsumerImpl.subscribeAsync` method. Because users may use consumer like this:
   
   ``` java
           client.newConsumer().topic(
                           "persistent://public/dafault/test-topic1-partition-0",
                           "persistent://public/dafault/test-topic2-partition-1"
            );
   ```
   If force subscription to PartitionTopic inside the method, Then all partitions of `test-topic1` and `test-topic2` will be subscribed. This is a breaking change.
   
   So, I revert the original changes.
   
   The new changes are: Inside the `PatternTopicsChangedListener.onTopicsAdded` method let it use partitioned name to subscribe.
   
   @codelipenghui @mattisonchao @Technoboy- @nodece @BewareMyPower Sorry, please help review again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #16719: [fix][client] Fix subscription topic name error.

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #16719:
URL: https://github.com/apache/pulsar/pull/16719#discussion_r927511886


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java:
##########
@@ -203,9 +205,14 @@ public CompletableFuture<Void> onTopicsAdded(Collection<String> addedTopics) {
                 return addFuture;
             }
 
+            Set<String> addTopicPartitionedName = addedTopics.stream()

Review Comment:
   Okay.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on pull request #16719: [fix][client] Fix subscription topic name error.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on PR #16719:
URL: https://github.com/apache/pulsar/pull/16719#issuecomment-1192084264

   @andrasbeni Could you help to review it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- merged pull request #16719: [fix][client] Fix subscription topic name error.

Posted by GitBox <gi...@apache.org>.
Technoboy- merged PR #16719:
URL: https://github.com/apache/pulsar/pull/16719


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #16719: [fix][client] Fix subscription topic name error.

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #16719:
URL: https://github.com/apache/pulsar/pull/16719#discussion_r927646481


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java:
##########
@@ -469,11 +469,6 @@ public void testStartEmptyPatternConsumer() throws Exception {
             .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
 
-        List<String> topicNames = Lists.newArrayList(topicName1, topicName2, topicName3);
-        NamespaceService nss = pulsar.getNamespaceService();
-        doReturn(CompletableFuture.completedFuture(topicNames)).when(nss)
-                .getListOfPersistentTopics(NamespaceName.get("my-property/my-ns"));

Review Comment:
   This is a PR added before trying to fix that test, it didn't work.
   
   https://github.com/apache/pulsar/pull/16599/files



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on pull request #16719: [fix][client] Fix topic name subscribe error.

Posted by GitBox <gi...@apache.org>.
shibd commented on PR #16719:
URL: https://github.com/apache/pulsar/pull/16719#issuecomment-1191387444

   @codelipenghui @gaozhangmin @Technoboy- @nodece Please help review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on pull request #16719: [fix][client] Fix subscription topic name error.

Posted by GitBox <gi...@apache.org>.
shibd commented on PR #16719:
URL: https://github.com/apache/pulsar/pull/16719#issuecomment-1193048652

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #16719: [fix][client] Fix subscription topic name error.

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #16719:
URL: https://github.com/apache/pulsar/pull/16719#discussion_r927474829


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java:
##########
@@ -203,9 +205,14 @@ public CompletableFuture<Void> onTopicsAdded(Collection<String> addedTopics) {
                 return addFuture;
             }
 
+            Set<String> addTopicPartitionedName = addedTopics.stream()

Review Comment:
   Do you need to fix the https://github.com/apache/pulsar/pull/16719/files#diff-779a47f80b0048bab183870ed4c1ecd07ffd538006cf9358e494bee23405d924R179?
   
   There has the same logic with this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org