You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/06/14 10:58:57 UTC

[GitHub] [pulsar] massakam opened a new pull request #10920: [broker] Fix issue where Key_Shared consumers could get stuck

massakam opened a new pull request #10920:
URL: https://github.com/apache/pulsar/pull/10920


   ### Motivation
   
   Repeatedly opening and closing consumers on a Key_Shared subscription can occasionally stop dispatching to all consumers. The following is stats of the topic when the phenomenon occurred.
   
   <details>
   <summary>stats.json</summary>
   <pre>
   <code>
   {
     "msgRateIn" : 0.0,
     "msgThroughputIn" : 0.0,
     "msgRateOut" : 0.0,
     "msgThroughputOut" : 0.0,
     "averageMsgSize" : 0.0,
     "storageSize" : 888206,
     "backlogSize" : 153264,
     "publishers" : [ ],
     "subscriptions" : {
       "sub1" : {
         "msgRateOut" : 0.0,
         "msgThroughputOut" : 0.0,
         "msgRateRedeliver" : 0.0,
         "msgBacklog" : 2324,
         "blockedSubscriptionOnUnackedMsgs" : false,
         "msgDelayed" : 0,
         "unackedMessages" : 1,
         "type" : "Key_Shared",
         "msgRateExpired" : 0.0,
         "consumers" : [ {
           "msgRateOut" : 0.0,
           "msgThroughputOut" : 0.0,
           "msgRateRedeliver" : 0.0,
           "consumerName" : "5a9ed",
           "availablePermits" : 607,
           "unackedMessages" : 0,
           "blockedConsumerOnUnackedMsgs" : false,
           "metadata" : { },
           "connectedSince" : "2021-06-11T18:57:08.475+09:00",
           "clientVersion" : "2.4.2.36-yjrelease",
           "address" : "/xxx.xxx.xxx.xxx:59196"
         }, {
           "msgRateOut" : 0.0,
           "msgThroughputOut" : 0.0,
           "msgRateRedeliver" : 0.0,
           "consumerName" : "0e74e",
           "availablePermits" : 686,
           "unackedMessages" : 0,
           "blockedConsumerOnUnackedMsgs" : false,
           "metadata" : { },
           "connectedSince" : "2021-06-11T18:57:31.293+09:00",
           "clientVersion" : "2.4.2.36-yjrelease",
           "address" : "/xxx.xxx.xxx.xxx:59198"
         }, {
           "msgRateOut" : 0.0,
           "msgThroughputOut" : 0.0,
           "msgRateRedeliver" : 0.0,
           "consumerName" : "b8ac0",
           "availablePermits" : 952,
           "unackedMessages" : 0,
           "blockedConsumerOnUnackedMsgs" : false,
           "metadata" : { },
           "connectedSince" : "2021-06-11T18:57:58.618+09:00",
           "clientVersion" : "2.4.2.36-yjrelease",
           "address" : "/xxx.xxx.xxx.xxx:59188"
         }, {
           "msgRateOut" : 0.0,
           "msgThroughputOut" : 0.0,
           "msgRateRedeliver" : 0.0,
           "consumerName" : "43e0a",
           "availablePermits" : 1000,
           "unackedMessages" : 0,
           "blockedConsumerOnUnackedMsgs" : false,
           "metadata" : { },
           "connectedSince" : "2021-06-11T18:58:24.501+09:00",
           "clientVersion" : "2.4.2.36-yjrelease",
           "address" : "/xxx.xxx.xxx.xxx:59190"
         } ],
         "isReplicated" : false,
         "consumersAfterMarkDeletePosition" : {
           "43e0a" : "3860483:12549"
         }
       }
     },
     "replication" : { },
     "deduplicationStatus" : "Disabled"
   }
   </code>
   </pre>
   </details>
   
   The strange thing is that every consumer has an `unackedMessages` value of 0, but the subscription-level `unackedMessages` value is 1.
   
   ### Modifications
   
   The cause of this issue is the following part: 
   https://github.com/apache/pulsar/blob/894d92b2be3bee334e7ce32760c4d2e7978603aa/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L124-L125
   
   When `removeConsumer()` of the superclass is called, the pending acks owned by that consumer are added to `messagesToRedeliver`.
   https://github.com/apache/pulsar/blob/894d92b2be3bee334e7ce32760c4d2e7978603aa/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L184-L188
   
   However, the consumer has not yet been removed from `selector`, so the broker attempts to send messages to the consumer that has already been closed. Those messages are removed from `messagesToRedeliver`, but they aren't actually sent to any consumer.
   https://github.com/apache/pulsar/blob/894d92b2be3bee334e7ce32760c4d2e7978603aa/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L204-L210
   
   As a result, the mark-delete position does not move and all consumers will get stuck.
   
   Therefore, in `PersistentStickyKeyDispatcherMultipleConsumers#removeConsumer()`, we need to remove the consumer from `selector` before calling `removeConsumer()` of the superclass. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #10920: [broker] Fix issue where Key_Shared consumers could get stuck

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #10920:
URL: https://github.com/apache/pulsar/pull/10920#discussion_r651299905



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
##########
@@ -121,8 +121,14 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce
 
     @Override
     public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
-        super.removeConsumer(consumer);
+        // The consumer must be removed from the selector before calling the superclass removeConsumer method.
+        // In the superclass removeConsumer method, the pending acks that the consumer has are added to
+        // messagesToRedeliver. If the consumer has not been removed from the selector at this point,

Review comment:
       Ok, I think the stack trace here would be like: 
   
   `removeConsumer()`
   ↓
   `super.removeConsumer()`
   ↓
   `readMoreEntries()` (This is what is used to trigger the re-delivery of the messages that were pending on the removed consumer)
   ↓
   `readEntriesComplete()`
   ↓
   `sendMessagesToConsumers()` As mentioned, this can select the removed consumer which is still in the selector list. 
   
   At this point, the `sendMessagesToConsumers()` will fail and the message will stay into the `pendingAcks` set for that consumer, but, since the consumer was already removed, the redelivery of this message will not happen.
   
   I think this change is the correct one. 
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on a change in pull request #10920: [broker] Fix issue where Key_Shared consumers could get stuck

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #10920:
URL: https://github.com/apache/pulsar/pull/10920#discussion_r651210993



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
##########
@@ -121,8 +121,14 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce
 
     @Override
     public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
-        super.removeConsumer(consumer);
+        // The consumer must be removed from the selector before calling the superclass removeConsumer method.
+        // In the superclass removeConsumer method, the pending acks that the consumer has are added to
+        // messagesToRedeliver. If the consumer has not been removed from the selector at this point,

Review comment:
       @codelipenghui Perhaps this happens if the messages to be redelivered is in the managed ledger cache. In this case,
   ```
   readMoreEntries()
   ↓
   readEntriesComplete()
   ↓
   sendMessagesToConsumers()
   ```
   are executed and completed synchronously in `PersistentDispatcherMultipleConsumers#removeConsumer()`.
   https://github.com/apache/pulsar/blob/894d92b2be3bee334e7ce32760c4d2e7978603aa/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L184-L195
   
   `synchronized` protection does not work because all of these methods are executed by the same thread.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat merged pull request #10920: [broker] Fix issue where Key_Shared consumers could get stuck

Posted by GitBox <gi...@apache.org>.
merlimat merged pull request #10920:
URL: https://github.com/apache/pulsar/pull/10920


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #10920: [broker] Fix issue where Key_Shared consumers could get stuck

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #10920:
URL: https://github.com/apache/pulsar/pull/10920#discussion_r651357683



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
##########
@@ -121,8 +121,14 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce
 
     @Override
     public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
-        super.removeConsumer(consumer);
+        // The consumer must be removed from the selector before calling the superclass removeConsumer method.
+        // In the superclass removeConsumer method, the pending acks that the consumer has are added to
+        // messagesToRedeliver. If the consumer has not been removed from the selector at this point,

Review comment:
       Oh, I see. Thanks for the explanation @massakam @merlimat. I missed that `super.removeConsumer()` also get a chance to call `readMoreEntries()`. 
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #10920: [broker] Fix issue where Key_Shared consumers could get stuck

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #10920:
URL: https://github.com/apache/pulsar/pull/10920#discussion_r651021560



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
##########
@@ -121,8 +121,14 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce
 
     @Override
     public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
-        super.removeConsumer(consumer);
+        // The consumer must be removed from the selector before calling the superclass removeConsumer method.
+        // In the superclass removeConsumer method, the pending acks that the consumer has are added to
+        // messagesToRedeliver. If the consumer has not been removed from the selector at this point,

Review comment:
       > If the consumer has not been removed from the selector at this point
   
   Looks like a race condition between sending messages to the consumer and remove the consumer from the selector?
   
   We have a `synchronized` to protect the `readEntriesComplete` and `removeConsumer`. How can this happen?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org