You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/07 16:11:42 UTC

[GitHub] [pulsar] poorbarcode opened a new pull request, #17526: [fix][broker]Consumer can't consume messages because there has two sames topics in one broker

poorbarcode opened a new pull request, #17526:
URL: https://github.com/apache/pulsar/pull/17526

   ### Motivation
   
   With the transaction feature, we send and receive messages, and at the same time, execute `admin API: unload namespace` 1000 times. Then the problem occur: Consumer could not receive any message, and there has no error log. After that we tried `admin API: get topic stats`, and the response showed only producers are registered on topic, and no consumers are registered on topic, but consumer stat is `Ready` in the client. This means that the state of the consumer is inconsistent between the broker and the client.
   
   #### Location problem
   
   Then we found the problem: Two PersistentTopic which have the same name registered at a broker node, consumer registered on one (aka `topic-c`), and producer registered on another one (aka `topic-p`). At this time, when we send messages, the data flow like this : 
   
   ```text
   client: producer sends a message
   
   broker: handle cmd-send
   
   broker: find the topic by name, it is "topic-p"
   
   broker: find all subscriptions registered on "topic-p"
   
   broker: found one subscription, but it has no consumers registered
   
   broker: no need to send the message to the client
   ``` 
   
   But the consumer exactly registered on another topic: `topic-c`, so consumer could not receive any message.
   
   #### Repreduce
   
   > *How to reproduce two topics registered at the same broker node ?*
   
   Make  `transaction buffer recover`, `admin unload namespace`, `client create consumer`, ` client create producer` executed at the same time, the process flow like this (at the step-11, the problem occurs ):
   
   | Time | `transaction buffer recoverr` | `admin unload namespace` | `client create consumer` | `client create producer` |
   | ----------- | ----------- | ----------- | ----------- | ----------- |
   | 1 | TB recover |  |  |  |
   | 2 | TB recover failure | topic.unload |  |  |
   | 3 | topic.close(false) | topic.close(true) |  |  |
   | 4 | brokerService.topics.remove(topicName) |  |  |  |
   | 5 | remove topic finish |  | lookup |  |
   | 6 |  |  | create `topic-c` |  |
   | 7 |  |  | consumer registered on `topic-c` |  |
   | 8 |  | brokerService.topics.remove(topic) |  |  |
   | 9 |  | remove `topic-c` finish |  | lookup |
   | 10 |  |  |  | create `topic-p`  |
   | 11 |  |  |  | producer registered on `topic-p`  |
   
   - Each column means the individual process. e.g. `client create consumer`,  `client create producer`.
   - Multiple processes are going on at the same time, and all effet the `brokerService.topics`.
   - Column `Time` is used only to indicate the order of each step, not the actual time.
   - The important steps are explained below:
   
   > step 3
   
   Even if persistent topic property`isClosingOrDeleting` have already changed to `true`, it still can be executed another once, see line-1247:
   
   https://github.com/apache/pulsar/blob/f230d15ffcd5f74cca13bd23b35ace784d6f8ce6/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L1240-L1249 
   
   Whether close can be executed depends on two predicates: `is closing` or `@param closeWithoutWaitingClientDisconnect is true`. This means that method `topic.close` can be reentrant executed when `@param closeWithoutWaitingClientDisconnect` is true, and in the implementation of `admin API: unload namespace` the parameter `closeWithoutWaitingClientDisconnect` is exactly `true`.
   
   https://github.com/apache/pulsar/blob/f230d15ffcd5f74cca13bd23b35ace784d6f8ce6/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java#L723-L725
   
   So when `transaction buffer recover fail` and `admin unload namespace` is executed at the same time, and `transaction buffer recover fail` before `admin unload namespace`, the topic will be removed from `brokerService.topics` twice.
   
   > step-4 / step-8
   
   Because of the current implementation of `BrokerService. removeTopicFromCache` use cmd `map.remove(key)`, not use `map.remove(key, value)`, So this cmd can remove any value in the map, even if it's not the desired one.
   
   https://github.com/apache/pulsar/blob/f230d15ffcd5f74cca13bd23b35ace784d6f8ce6/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java#L1956
   
   To sum up: We should make these two changes: 
   
   - Make method `topic.close` non-reentrant. Also prevent reentrant between `topic.close` and `topic.delete`.
   - Use cmd  `map.remove(key, value)` instead of `map.remove(key)` in implementation of `BrokerService. removeTopicFromCache`. This change will apply to both scenes: `topic.close` and `topic.delete`.
   
   ### Modifications
   
   - Make method `topic.close` non-reentrant. Also prevent reentrant between `topic.close` and `topic.delete`.
     - fixed by PR #17524
   - Use cmd  `map.remove(key, value)` instead of `map.remove(key)` in implementation of `BrokerService. 
     - fixed by current PR 
   
   
   ### Documentation
   
   - [ ] `doc-required` 
     
   - [ ] `doc-not-needed` 
     
   - [ ] `doc` 
   
   - [ ] `doc-complete`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #17526: [fix][broker]Consumer can't consume messages because there has two sames topics in one broker

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #17526:
URL: https://github.com/apache/pulsar/pull/17526#issuecomment-1253116677

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui closed pull request #17526: [fix][broker]Consumer can't consume messages because there has two sames topics in one broker

Posted by GitBox <gi...@apache.org>.
codelipenghui closed pull request #17526: [fix][broker]Consumer can't consume messages because there has two sames topics in one broker
URL: https://github.com/apache/pulsar/pull/17526


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #17526: [fix][broker]Consumer can't consume messages because there has two sames topics in one broker

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #17526:
URL: https://github.com/apache/pulsar/pull/17526#issuecomment-1251213461

   resolve the conflicts and rebase `master`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #17526: [fix][broker]Consumer can't consume messages because there has two sames topics in one broker

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #17526:
URL: https://github.com/apache/pulsar/pull/17526#issuecomment-1251215288

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #17526: [fix][broker]Consumer can't consume messages because there has two sames topics in one broker

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #17526:
URL: https://github.com/apache/pulsar/pull/17526#discussion_r974938813


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -2006,15 +2006,45 @@ public AuthorizationService getAuthorizationService() {
         return authorizationService;
     }
 
-    public CompletableFuture<Void> removeTopicFromCache(String topic) {
+    public CompletableFuture<Void> removeTopicFromCache(String topicNameString, Topic topic) {
+        if (topic == null){
+            return removeTopicFutureFromCache(topicNameString, null);
+        }
+        final CompletableFuture<Optional<Topic>> createTopicFuture = topics.get(topicNameString);
+        // If not exists in cache, do nothing.
+        if (createTopicFuture == null){
+            return CompletableFuture.completedFuture(null);
+        }
+        // If the future in cache is not yet complete, the topic instance in the cache is not the same with the topic
+        // in the argument. Do nothing.
+        if (!createTopicFuture.isDone()){
+            return CompletableFuture.completedFuture(null);
+        }
+        return createTopicFuture.thenCompose(topicOptional -> {
+            Topic topicInCache = topicOptional.orElse(null);
+            // If @param topic is not equals with cached, do nothing.
+            if (topicInCache == null || topicInCache != topic){
+                return CompletableFuture.completedFuture(null);
+            } else {
+                // Do remove.
+                return removeTopicFutureFromCache(topicNameString, createTopicFuture);
+            }
+            // If the future in cache has exception complete,
+            // the topic instance in the cache is not the same with the topic.
+        }).exceptionally(ex -> null);

Review Comment:
   Already change the logic to avoid ignoring exceptions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #17526: [fix][broker]Consumer can't consume messages because there has two sames topics in one broker

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #17526:
URL: https://github.com/apache/pulsar/pull/17526#discussion_r971017688


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -2006,15 +2006,43 @@ public AuthorizationService getAuthorizationService() {
         return authorizationService;
     }
 
-    public CompletableFuture<Void> removeTopicFromCache(String topic) {
+    public CompletableFuture<Void> removeTopicFromCache(String topicNameString, Topic topic) {
+        if (topic == null){
+            return removeTopicFutureFromCache(topicNameString, null);
+        }
+        final CompletableFuture<Optional<Topic>> createTopicFuture = topics.get(topicNameString);
+        // If not exists in cache, do nothing.
+        if (createTopicFuture == null){
+            return CompletableFuture.completedFuture(null);
+        }
+        // If the future in cache is not yet complete, the topic instance in the cache is not the same with the topic
+        // in the argument. Do nothing.
+        if (!createTopicFuture.isDone()){
+            return CompletableFuture.completedFuture(null);
+        }
+        return createTopicFuture.thenCompose(topicOptional -> {

Review Comment:
   This is a good point.
   
   I have covered this logic branch: If the future in cache has exception complete, the topic instance in the cache is not the same as the @param topic, so the delete will return `success`
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #17526: [fix][broker]Consumer can't consume messages because there has two sames topics in one broker

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #17526:
URL: https://github.com/apache/pulsar/pull/17526#issuecomment-1240044132

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #17526: [fix][broker]Consumer can't consume messages because there has two sames topics in one broker

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #17526:
URL: https://github.com/apache/pulsar/pull/17526#discussion_r974835112


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -2006,15 +2006,45 @@ public AuthorizationService getAuthorizationService() {
         return authorizationService;
     }
 
-    public CompletableFuture<Void> removeTopicFromCache(String topic) {
+    public CompletableFuture<Void> removeTopicFromCache(String topicNameString, Topic topic) {
+        if (topic == null){
+            return removeTopicFutureFromCache(topicNameString, null);
+        }
+        final CompletableFuture<Optional<Topic>> createTopicFuture = topics.get(topicNameString);
+        // If not exists in cache, do nothing.
+        if (createTopicFuture == null){
+            return CompletableFuture.completedFuture(null);
+        }
+        // If the future in cache is not yet complete, the topic instance in the cache is not the same with the topic
+        // in the argument. Do nothing.
+        if (!createTopicFuture.isDone()){
+            return CompletableFuture.completedFuture(null);
+        }
+        return createTopicFuture.thenCompose(topicOptional -> {
+            Topic topicInCache = topicOptional.orElse(null);
+            // If @param topic is not equals with cached, do nothing.
+            if (topicInCache == null || topicInCache != topic){
+                return CompletableFuture.completedFuture(null);
+            } else {
+                // Do remove.
+                return removeTopicFutureFromCache(topicNameString, createTopicFuture);
+            }
+            // If the future in cache has exception complete,
+            // the topic instance in the cache is not the same with the topic.
+        }).exceptionally(ex -> null);

Review Comment:
   Better to add log here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #17526: [fix][broker]Consumer can't consume messages because there has two sames topics in one broker

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #17526:
URL: https://github.com/apache/pulsar/pull/17526#issuecomment-1239648170

   This PR should merge into these branches: 
   - `branch-2.8`
   - `branch-2.9`
   - `branch-2.10`
   - `branch-2.11`
   - `master`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17526: [fix][broker]Consumer can't consume messages because there has two sames topics in one broker

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17526:
URL: https://github.com/apache/pulsar/pull/17526#discussion_r970913519


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -2006,15 +2006,43 @@ public AuthorizationService getAuthorizationService() {
         return authorizationService;
     }
 
-    public CompletableFuture<Void> removeTopicFromCache(String topic) {
+    public CompletableFuture<Void> removeTopicFromCache(String topicNameString, Topic topic) {
+        if (topic == null){
+            return removeTopicFutureFromCache(topicNameString, null);
+        }
+        final CompletableFuture<Optional<Topic>> createTopicFuture = topics.get(topicNameString);
+        // If not exists in cache, do nothing.
+        if (createTopicFuture == null){
+            return CompletableFuture.completedFuture(null);
+        }
+        // If the future in cache is not yet complete, the topic instance in the cache is not the same with the topic
+        // in the argument. Do nothing.
+        if (!createTopicFuture.isDone()){
+            return CompletableFuture.completedFuture(null);
+        }
+        return createTopicFuture.thenCompose(topicOptional -> {

Review Comment:
   The createTopicFuture might be completed with the exception?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #17526: [fix][broker]Consumer can't consume messages because there has two sames topics in one broker

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #17526:
URL: https://github.com/apache/pulsar/pull/17526#issuecomment-1246981041

   rebase master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #17526: [fix][broker]Consumer can't consume messages because there has two sames topics in one broker

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #17526:
URL: https://github.com/apache/pulsar/pull/17526#issuecomment-1250915273

   @poorbarcode Please resolve the conflicts


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui merged pull request #17526: [fix][broker]Consumer can't consume messages because there has two sames topics in one broker

Posted by GitBox <gi...@apache.org>.
codelipenghui merged PR #17526:
URL: https://github.com/apache/pulsar/pull/17526


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #17526: [fix][broker]Consumer can't consume messages because there has two sames topics in one broker

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #17526:
URL: https://github.com/apache/pulsar/pull/17526#issuecomment-1247404193

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org