You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/05/15 09:55:13 UTC

[GitHub] [pulsar] massakam opened a new pull request #6966: [broker] Fix bug that causes delivery of messages to stop on Shared subscription

massakam opened a new pull request #6966:
URL: https://github.com/apache/pulsar/pull/6966


   ### Motivation
   
   If a large number of consumers are repeatedly added or removed from a Shared type subscription, message delivery to consumers may stop occasionally. Message delivery will not resume until we unload the topic or restart the broker.
   
   When I set the log level of the broker to DEBUG, the following log was output while the message delivery was stopped.
   
   > 17:17:16.425 [pulsar-io-20-6] DEBUG o.a.p.b.s.p.PersistentDispatcherMultipleConsumers - [persistent://massakam/global/test/pt16-partition-15 / sub1] Cannot schedule next read until previous one is done
   
   This means that the variable `havePendingRead` in `PersistentDispatcherMultipleConsumers` remains true and the reading of new entries has been canceled.
   https://github.com/apache/pulsar/blob/v2.3.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L312-L321
   
   However, as far as I can see the stats-internal of that topic, there are no actual pending read operations. Therefore, it seems that the value of `havePendingRead` is wrong.
   ```json
   "cursors" : {
     "sub1" : {
       "markDeletePosition" : "2383570:2158",
       "readPosition" : "2383570:2159",
       "waitingReadOp" : false,
       "pendingReadOps" : 0,
       "messagesConsumedCounter" : 309909,
       "cursorLedger" : 2383013,
       "cursorLedgerLastEntry" : 775,
       "individuallyDeletedMessages" : "[]",
       "lastLedgerSwitchTimestamp" : "2020-05-13T13:10:35.721+09:00",
       "state" : "Open",
       "numberOfEntriesSinceFirstNotAckedMessage" : 1,
       "totalNonContiguousDeletedMessagesRange" : 0,
       "properties" : { }
     }
   }
   ```
   
   As a result of the investigation, I found that `IllegalArgumentException` was thrown on the following line:
   https://github.com/apache/pulsar/blob/v2.3.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L318
   
   ```
   java.lang.IllegalArgumentException: null
           at com.google.common.base.Preconditions.checkArgument(Preconditions.java:127)
           at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReadEntriesOrWait(ManagedCursorImpl.java:561)
           at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:325)
           at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.lambda$readMoreEntries$2(PersistentDispatcherMultipleConsumers.java:255)
           at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
           at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
           at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
           at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
           at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
           at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
           at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
           at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
           at java.lang.Thread.run(Thread.java:748)
   ```
   
   This means that the value of `messagesToRead` is less than or equal to 0. A value of 0 or less may be assigned to `messagesToRead` in the following part:
   https://github.com/apache/pulsar/blob/1fd1b2b440af2477f916999a67752f9f532d1620/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L254-L255
   
   This method is not synchronized, so `totalAvailablePermits` may be updated by other threads. Therefore, even if `totalAvailablePermits` is greater than 0 on the first line, it may be 0 or less on the second line.
   
   As a result, `messagesToRead` becomes 0 or less and `IllegalArgumentException` is thrown. When this happens, the callback method is never executed, so `havePendingRead` will never return to false, and message delivery will stop.
   
   ### Modifications
   
   Fixed the part that causes this bug as follows:
   ```diff
   -if (totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
   -    int messagesToRead = Math.min(totalAvailablePermits, readBatchSize);
   +int currentTotalAvailablePermits = totalAvailablePermits;
   +if (currentTotalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
   +    int messagesToRead = Math.min(currentTotalAvailablePermits, readBatchSize);
   ```
   
   Furthermore, if `messagesToRead` is 0 or less, correct it to 1 before passing it to `ManagedCursorImpl#asyncReadEntriesOrWait()`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #6966: [broker] Fix bug that causes delivery of messages to stop on Shared subscription

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #6966:
URL: https://github.com/apache/pulsar/pull/6966


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on pull request #6966: [broker] Fix bug that causes delivery of messages to stop on Shared subscription

Posted by GitBox <gi...@apache.org>.
massakam commented on pull request #6966:
URL: https://github.com/apache/pulsar/pull/6966#issuecomment-629921280


   @jiazhai Okay. Changed the release label to 2.5.3.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jiazhai commented on pull request #6966: [broker] Fix bug that causes delivery of messages to stop on Shared subscription

Posted by GitBox <gi...@apache.org>.
jiazhai commented on pull request #6966:
URL: https://github.com/apache/pulsar/pull/6966#issuecomment-629914223


   @massakam Since 2.5.2 was send out for vote, could we add it to 2.5.3?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org