You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/04/08 01:52:25 UTC
[pulsar] branch master updated: Fix 8115 Some partitions get stuck
after adding additional consumers to the KEY_SHARED subscriptions (#10096)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c4f154e Fix 8115 Some partitions get stuck after adding additional consumers to the KEY_SHARED subscriptions (#10096)
c4f154e is described below
commit c4f154e79c03cff9055aa4e2ede7748c5952f2bc
Author: baomingyu <ba...@163.com>
AuthorDate: Thu Apr 8 09:51:39 2021 +0800
Fix 8115 Some partitions get stuck after adding additional consumers to the KEY_SHARED subscriptions (#10096)
Fixes #8115
Master Issue: #8115
### Motivation
first point:
Sometimes it will not success to call this method and the method readMoreEntries will not be called
` if (future.isSuccess() && keyNumbers.decrementAndGet() == 0) {
readMoreEntries();
} `
second point:
Sometimes keyNumbers will not be decrement to zero , and broker will not be start next loop to readMoreEntries.
some partition topic will be stunk and stop to push message to consumer ,even though there is permits in consumers.
---
.../PersistentStickyKeyDispatcherMultipleConsumers.java | 15 ++++++++++++++-
1 file changed, 14 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 3107f13..d27df30 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -28,6 +28,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -171,6 +172,10 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
AtomicInteger keyNumbers = new AtomicInteger(groupedEntries.size());
+ int currentThreadKeyNumber = groupedEntries.size();
+ if (currentThreadKeyNumber == 0) {
+ currentThreadKeyNumber = -1;
+ }
for (Map.Entry<Consumer, List<Entry>> current : groupedEntries.entrySet()) {
Consumer consumer = current.getKey();
List<Entry> entriesWithSameKey = current.getValue();
@@ -214,7 +219,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(),
getRedeliveryTracker()).addListener(future -> {
- if (future.isSuccess() && keyNumbers.decrementAndGet() == 0) {
+ if (future.isDone() && keyNumbers.decrementAndGet() == 0) {
readMoreEntries();
}
});
@@ -223,6 +228,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
-(sendMessageInfo.getTotalMessages() - batchIndexesAcks.getTotalAckedIndexCount()));
totalMessagesSent += sendMessageInfo.getTotalMessages();
totalBytesSent += sendMessageInfo.getTotalBytes();
+ } else {
+ currentThreadKeyNumber = keyNumbers.decrementAndGet();
}
}
@@ -260,6 +267,12 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
// readMoreEntries should run regardless whether or not stuck is caused by
// stuckConsumers for avoid stopping dispatch.
readMoreEntries();
+ } else if (currentThreadKeyNumber == 0) {
+ topic.getBrokerService().executor().schedule(() -> {
+ synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) {
+ readMoreEntries();
+ }
+ }, 100, TimeUnit.MILLISECONDS);
}
}