You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/05/18 13:54:18 UTC
[pulsar] branch master updated: Fix bug that causes delivery of
messages to stop on Shared subscription (#6966)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7396f26 Fix bug that causes delivery of messages to stop on Shared subscription (#6966)
7396f26 is described below
commit 7396f26011bd3fe5d41a82f730e0cff6ee34f719
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Mon May 18 22:53:59 2020 +0900
Fix bug that causes delivery of messages to stop on Shared subscription (#6966)
### Motivation
If a large number of consumers are repeatedly added or removed from a Shared type subscription, message delivery to consumers may stop occasionally. Message delivery will not resume until we unload the topic or restart the broker.
When I set the log level of the broker to DEBUG, the following log was output while the message delivery was stopped.
> 17:17:16.425 [pulsar-io-20-6] DEBUG o.a.p.b.s.p.PersistentDispatcherMultipleConsumers - [persistent://massakam/global/test/pt16-partition-15 / sub1] Cannot schedule next read until previous one is done
This means that the variable `havePendingRead` in `PersistentDispatcherMultipleConsumers` remains true and the reading of new entries has been canceled.
https://github.com/apache/pulsar/blob/v2.3.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L312-L321
However, as far as I can see the stats-internal of that topic, there are no actual pending read operations. Therefore, it seems that the value of `havePendingRead` is wrong.
```json
"cursors" : {
"sub1" : {
"markDeletePosition" : "2383570:2158",
"readPosition" : "2383570:2159",
"waitingReadOp" : false,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 309909,
"cursorLedger" : 2383013,
"cursorLedgerLastEntry" : 775,
"individuallyDeletedMessages" : "[]",
"lastLedgerSwitchTimestamp" : "2020-05-13T13:10:35.721+09:00",
"state" : "Open",
"numberOfEntriesSinceFirstNotAckedMessage" : 1,
"totalNonContiguousDeletedMessagesRange" : 0,
"properties" : { }
}
}
```
As a result of the investigation, I found that `IllegalArgumentException` was thrown on the following line:
https://github.com/apache/pulsar/blob/v2.3.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L318
```
java.lang.IllegalArgumentException: null
at com.google.common.base.Preconditions.checkArgument(Preconditions.java:127)
at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReadEntriesOrWait(ManagedCursorImpl.java:561)
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:325)
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.lambda$readMoreEntries$2(PersistentDispatcherMultipleConsumers.java:255)
at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
```
This means that the value of `messagesToRead` is less than or equal to 0. A value of 0 or less may be assigned to `messagesToRead` in the following part:
https://github.com/apache/pulsar/blob/1fd1b2b440af2477f916999a67752f9f532d1620/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L254-L255
This method is not synchronized, so `totalAvailablePermits` may be updated by other threads. Therefore, even if `totalAvailablePermits` is greater than 0 on the first line, it may be 0 or less on the second line.
As a result, `messagesToRead` becomes 0 or less and `IllegalArgumentException` is thrown. When this happens, the callback method is never executed, so `havePendingRead` will never return to false, and message delivery will stop.
### Modifications
Fixed the part that causes this bug as follows:
```diff
-if (totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
- int messagesToRead = Math.min(totalAvailablePermits, readBatchSize);
+int currentTotalAvailablePermits = totalAvailablePermits;
+if (currentTotalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
+ int messagesToRead = Math.min(currentTotalAvailablePermits, readBatchSize);
```
Furthermore, if `messagesToRead` is 0 or less, correct it to 1 before passing it to `ManagedCursorImpl#asyncReadEntriesOrWait()`.
---
.../service/persistent/PersistentDispatcherMultipleConsumers.java | 8 ++++++--
.../persistent/PersistentDispatcherSingleActiveConsumer.java | 3 +++
.../pulsar/broker/service/persistent/PersistentReplicator.java | 3 +++
3 files changed, 12 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 74d131f..b93cc4e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -251,8 +251,10 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
}
public void readMoreEntries() {
- if (totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
- int messagesToRead = Math.min(totalAvailablePermits, readBatchSize);
+ // totalAvailablePermits may be updated by other threads
+ int currentTotalAvailablePermits = totalAvailablePermits;
+ if (currentTotalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
+ int messagesToRead = Math.min(currentTotalAvailablePermits, readBatchSize);
if (!isConsumerWritable()) {
// If the connection is not currently writable, we issue the read request anyway, but for a single
@@ -314,6 +316,8 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
return;
}
+ // If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException
+ messagesToRead = Math.max(messagesToRead, 1);
Set<PositionImpl> messagesToReplayNow = getMessagesToReplayNow(messagesToRead);
if (!messagesToReplayNow.isEmpty()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 4ad0f5a..2062744 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -437,6 +437,9 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
}
}
+ // If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException
+ messagesToRead = Math.max(messagesToRead, 1);
+
// Schedule read
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Schedule read of {} messages", name, consumer, messagesToRead);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index fef23e6..12fb877 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -276,6 +276,9 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
messagesToRead = 1;
}
+ // If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException
+ messagesToRead = Math.max(messagesToRead, 1);
+
// Schedule read
if (HAVE_PENDING_READ_UPDATER.compareAndSet(this, FALSE, TRUE)) {
if (log.isDebugEnabled()) {