You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/05/18 13:54:18 UTC

[pulsar] branch master updated: Fix bug that causes delivery of messages to stop on Shared subscription (#6966)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7396f26  Fix bug that causes delivery of messages to stop on Shared subscription (#6966)
7396f26 is described below

commit 7396f26011bd3fe5d41a82f730e0cff6ee34f719
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Mon May 18 22:53:59 2020 +0900

    Fix bug that causes delivery of messages to stop on Shared subscription (#6966)
    
    ### Motivation
    
    If a large number of consumers are repeatedly added or removed from a Shared type subscription, message delivery to consumers may stop occasionally. Message delivery will not resume until we unload the topic or restart the broker.
    
    When I set the log level of the broker to DEBUG, the following log was output while the message delivery was stopped.
    
    > 17:17:16.425 [pulsar-io-20-6] DEBUG o.a.p.b.s.p.PersistentDispatcherMultipleConsumers - [persistent://massakam/global/test/pt16-partition-15 / sub1] Cannot schedule next read until previous one is done
    
    This means that the variable `havePendingRead` in `PersistentDispatcherMultipleConsumers` remains true and the reading of new entries has been canceled.
    https://github.com/apache/pulsar/blob/v2.3.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L312-L321
    
    However, as far as I can see the stats-internal of that topic, there are no actual pending read operations. Therefore, it seems that the value of `havePendingRead` is wrong.
    ```json
    "cursors" : {
      "sub1" : {
        "markDeletePosition" : "2383570:2158",
        "readPosition" : "2383570:2159",
        "waitingReadOp" : false,
        "pendingReadOps" : 0,
        "messagesConsumedCounter" : 309909,
        "cursorLedger" : 2383013,
        "cursorLedgerLastEntry" : 775,
        "individuallyDeletedMessages" : "[]",
        "lastLedgerSwitchTimestamp" : "2020-05-13T13:10:35.721+09:00",
        "state" : "Open",
        "numberOfEntriesSinceFirstNotAckedMessage" : 1,
        "totalNonContiguousDeletedMessagesRange" : 0,
        "properties" : { }
      }
    }
    ```
    
    As a result of the investigation, I found that `IllegalArgumentException` was thrown on the following line:
    https://github.com/apache/pulsar/blob/v2.3.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L318
    
    ```
    java.lang.IllegalArgumentException: null
            at com.google.common.base.Preconditions.checkArgument(Preconditions.java:127)
            at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReadEntriesOrWait(ManagedCursorImpl.java:561)
            at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:325)
            at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.lambda$readMoreEntries$2(PersistentDispatcherMultipleConsumers.java:255)
            at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
            at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
            at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
            at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
            at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
            at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
            at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
            at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
            at java.lang.Thread.run(Thread.java:748)
    ```
    
    This means that the value of `messagesToRead` is less than or equal to 0. A value of 0 or less may be assigned to `messagesToRead` in the following part:
    https://github.com/apache/pulsar/blob/1fd1b2b440af2477f916999a67752f9f532d1620/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L254-L255
    
    This method is not synchronized, so `totalAvailablePermits` may be updated by other threads. Therefore, even if `totalAvailablePermits` is greater than 0 on the first line, it may be 0 or less on the second line.
    
    As a result, `messagesToRead` becomes 0 or less and `IllegalArgumentException` is thrown. When this happens, the callback method is never executed, so `havePendingRead` will never return to false, and message delivery will stop.
    
    ### Modifications
    
    Fixed the part that causes this bug as follows:
    ```diff
    -if (totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
    -    int messagesToRead = Math.min(totalAvailablePermits, readBatchSize);
    +int currentTotalAvailablePermits = totalAvailablePermits;
    +if (currentTotalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
    +    int messagesToRead = Math.min(currentTotalAvailablePermits, readBatchSize);
    ```
    
    Furthermore, if `messagesToRead` is 0 or less, correct it to 1 before passing it to `ManagedCursorImpl#asyncReadEntriesOrWait()`.
---
 .../service/persistent/PersistentDispatcherMultipleConsumers.java | 8 ++++++--
 .../persistent/PersistentDispatcherSingleActiveConsumer.java      | 3 +++
 .../pulsar/broker/service/persistent/PersistentReplicator.java    | 3 +++
 3 files changed, 12 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 74d131f..b93cc4e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -251,8 +251,10 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
     }
 
     public void readMoreEntries() {
-        if (totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
-            int messagesToRead = Math.min(totalAvailablePermits, readBatchSize);
+        // totalAvailablePermits may be updated by other threads
+        int currentTotalAvailablePermits = totalAvailablePermits;
+        if (currentTotalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
+            int messagesToRead = Math.min(currentTotalAvailablePermits, readBatchSize);
 
             if (!isConsumerWritable()) {
                 // If the connection is not currently writable, we issue the read request anyway, but for a single
@@ -314,6 +316,8 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                 return;
             }
 
+            // If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException
+            messagesToRead = Math.max(messagesToRead, 1);
             Set<PositionImpl> messagesToReplayNow = getMessagesToReplayNow(messagesToRead);
 
             if (!messagesToReplayNow.isEmpty()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 4ad0f5a..2062744 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -437,6 +437,9 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
                 }
             }
 
+            // If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException
+            messagesToRead = Math.max(messagesToRead, 1);
+
             // Schedule read
             if (log.isDebugEnabled()) {
                 log.debug("[{}-{}] Schedule read of {} messages", name, consumer, messagesToRead);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index fef23e6..12fb877 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -276,6 +276,9 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
                 messagesToRead = 1;
             }
 
+            // If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException
+            messagesToRead = Math.max(messagesToRead, 1);
+
             // Schedule read
             if (HAVE_PENDING_READ_UPDATER.compareAndSet(this, FALSE, TRUE)) {
                 if (log.isDebugEnabled()) {