You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/09/12 12:02:15 UTC

[GitHub] [pulsar] massakam opened a new pull request #5185: [pulsar-broker] Fix bug that message delivery stops after resetting cursor for failover subscription

massakam opened a new pull request #5185: [pulsar-broker] Fix bug that message delivery stops after resetting cursor for failover subscription
URL: https://github.com/apache/pulsar/pull/5185
 
 
   ### Motivation
   
   Resetting the cursor for a subscription in Failover mode may cause message delivery to stop. This can be reproduced with the following procedure:
   
   1. Connect multiple consumers to a subscription in Failover mode
   1. Reset the subscription cursor to a past position
   1. Close some consumers
   1. The remaining consumers may not receive new messages from the topic
   
   At this time, the active consumer is already closed one:
   
   ```js
   "subscriptions" : {
     "sub1" : {
       "msgRateOut" : 0.0,
       "msgThroughputOut" : 0.0,
       "msgRateRedeliver" : 0.0,
       "msgBacklog" : 57604,
       "blockedSubscriptionOnUnackedMsgs" : false,
       "unackedMessages" : 0,
       "type" : "Failover",
       "activeConsumerName" : "04b6c", // This consumer is already closed!
       "msgRateExpired" : 0.0,
       "consumers" : [ {
         "msgRateOut" : 0.0,
         "msgThroughputOut" : 0.0,
         "msgRateRedeliver" : 0.0,
         "consumerName" : "06317b",
         "availablePermits" : 564,
         "unackedMessages" : 0,
         "blockedConsumerOnUnackedMsgs" : false,
         "metadata" : { },
         "connectedSince" : "2019-09-11T18:56:25.413+09:00",
         "clientVersion" : "2.3.2",
         "address" : "/xxx.xxx.xxx.xxx:36968"
       }, {
         "msgRateOut" : 0.0,
         "msgThroughputOut" : 0.0,
         "msgRateRedeliver" : 0.0,
         "consumerName" : "37edc",
         "availablePermits" : 1000,
         "unackedMessages" : 0,
         "blockedConsumerOnUnackedMsgs" : false,
         "metadata" : { },
         "connectedSince" : "2019-09-11T18:56:27.77+09:00",
         "clientVersion" : "2.3.2",
         "address" : "/xxx.xxx.xxx.xxx:38392"
       }, {
         "msgRateOut" : 0.0,
         "msgThroughputOut" : 0.0,
         "msgRateRedeliver" : 0.0,
         "consumerName" : "822f0",
         "availablePermits" : 1000,
         "unackedMessages" : 0,
         "blockedConsumerOnUnackedMsgs" : false,
         "metadata" : { },
         "connectedSince" : "2019-09-11T18:56:27.769+09:00",
         "clientVersion" : "2.3.2",
         "address" : "/xxx.xxx.xxx.xxx:38380"
       }, {
         "msgRateOut" : 0.0,
         "msgThroughputOut" : 0.0,
         "msgRateRedeliver" : 0.0,
         "consumerName" : "b91282",
         "availablePermits" : 1000,
         "unackedMessages" : 0,
         "blockedConsumerOnUnackedMsgs" : false,
         "metadata" : { },
         "connectedSince" : "2019-09-11T18:56:25.413+09:00",
         "clientVersion" : "2.3.2",
         "address" : "/xxx.xxx.xxx.xxx:38408"
       } ]
     }
   },
   ```
   
   This is because `AbstractDispatcherSingleActiveConsumer#closeFuture` is not null, so `pickAndScheduleActiveConsumer()` is not called and the active consumer does not change.
   https://github.com/apache/pulsar/blob/8c3445ad6746df93fef80d2c661374cdab00bc38/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java#L181-L184
   
   `closeFuture` becomes non-null when `disconnectAllConsumers()` is called. And once a value is assigned, it will never return to null.
   https://github.com/apache/pulsar/blob/8c3445ad6746df93fef80d2c661374cdab00bc38/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java#L217-L218
   
   `disconnectAllConsumers()` is called when unloading or deleting a topic, as well as when resetting the cursor.
   
   ### Modifications
   
   Added `resetCloseFuture()` method to the Dispatcher classes to return `closeFuture` to null when resetting cursor is completed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services