You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/31 15:45:26 UTC
[pulsar] branch master updated: Fix remove pending acks in
Key_Shared subscription (#4407)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4464dbb Fix remove pending acks in Key_Shared subscription (#4407)
4464dbb is described below
commit 4464dbb2bc6efdcc8fda806dce74f30640c3153e
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri May 31 23:45:21 2019 +0800
Fix remove pending acks in Key_Shared subscription (#4407)
* Fix remove pending acks in key_shared subscription.
* Fix stats in key_shared subscription.
* Add subscription utils to check isCumulativeAckMode or isIndividualAckMode.
* Use Subscription.isIndividualAckMode check in subscription stats
---
.../main/java/org/apache/pulsar/broker/service/Consumer.java | 8 ++++----
.../java/org/apache/pulsar/broker/service/ServerCnx.java | 2 +-
.../org/apache/pulsar/broker/service/StreamingStats.java | 2 +-
.../java/org/apache/pulsar/broker/service/Subscription.java | 9 +++++++++
.../broker/service/persistent/PersistentSubscription.java | 2 +-
.../pulsar/broker/service/persistent/PersistentTopic.java | 2 +-
.../apache/pulsar/client/api/KeySharedSubscriptionTest.java | 12 +++---------
7 files changed, 20 insertions(+), 17 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index e2c7fe1..840a0ca 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -138,7 +138,7 @@ public class Consumer {
stats.setClientVersion(cnx.getClientVersion());
stats.metadata = this.metadata;
- if (subType == SubType.Shared || subType == SubType.Key_Shared) {
+ if (Subscription.isIndividualAckMode(subType)) {
this.pendingAcks = new ConcurrentLongLongPairHashMap(256, 1);
} else {
// We don't need to keep track of pending acks if the subscription is not shared
@@ -334,7 +334,7 @@ public class Consumer {
return;
}
- if (subType == SubType.Shared) {
+ if (Subscription.isIndividualAckMode(subType)) {
log.warn("[{}] [{}] Received cumulative ack on shared subscription, ignoring", subscription, consumerId);
return;
}
@@ -350,7 +350,7 @@ public class Consumer {
PositionImpl position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
positionsAcked.add(position);
- if (subType == SubType.Shared) {
+ if (Subscription.isIndividualAckMode(subType)) {
removePendingAcks(position);
}
@@ -424,7 +424,7 @@ public class Consumer {
* @return
*/
private boolean shouldBlockConsumerOnUnackMsgs() {
- return SubType.Shared.equals(subType) && maxUnackedMessages > 0;
+ return Subscription.isIndividualAckMode(subType) && maxUnackedMessages > 0;
}
public void updateRates() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index ddb619f..9447af1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1104,7 +1104,7 @@ public class ServerCnx extends PulsarHandler {
if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
Consumer consumer = consumerFuture.getNow(null);
- if (redeliver.getMessageIdsCount() > 0 && (consumer.subType() == SubType.Shared || consumer.subType() == SubType.Key_Shared)) {
+ if (redeliver.getMessageIdsCount() > 0 && Subscription.isIndividualAckMode(consumer.subType())) {
consumer.redeliverUnacknowledgedMessages(redeliver.getMessageIdsList());
} else {
consumer.redeliverUnacknowledgedMessages();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StreamingStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StreamingStats.java
index a5614e8..892cdca 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StreamingStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StreamingStats.java
@@ -65,7 +65,7 @@ public class StreamingStats {
statsStream.writePair("msgThroughputOut", stats.msgThroughputOut);
statsStream.writePair("msgRateRedeliver", stats.msgRateRedeliver);
- if (PulsarApi.CommandSubscribe.SubType.Shared.equals(subType)) {
+ if (Subscription.isIndividualAckMode(subType)) {
statsStream.writePair("unackedMessages", stats.unackedMessages);
statsStream.writePair("blockedConsumerOnUnackedMsgs", stats.blockedConsumerOnUnackedMsgs);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
index 345ef0b..4851b5b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
@@ -89,4 +89,13 @@ public interface Subscription {
String getTypeString();
void addUnAckedMessages(int unAckMessages);
+
+ // Subscription utils
+ static boolean isCumulativeAckMode(SubType subType) {
+ return SubType.Exclusive.equals(subType) || SubType.Failover.equals(subType);
+ }
+
+ static boolean isIndividualAckMode(SubType subType) {
+ return SubType.Shared.equals(subType) || SubType.Key_Shared.equals(subType);
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 8cb97c5..42a1479 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -670,7 +670,7 @@ public class PersistentSubscription implements Subscription {
subStats.activeConsumerName = activeConsumer.consumerName();
}
}
- if (SubType.Shared.equals(subStats.type)) {
+ if (Subscription.isIndividualAckMode(subStats.type)) {
if (dispatcher instanceof PersistentDispatcherMultipleConsumers) {
PersistentDispatcherMultipleConsumers d = (PersistentDispatcherMultipleConsumers) dispatcher;
subStats.unackedMessages = d.getTotalUnackedMessages();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 22650d3..f60302e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1395,7 +1395,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
topicStatsStream.writePair("numberOfEntriesSinceFirstNotAckedMessage", subscription.getNumberOfEntriesSinceFirstNotAckedMessage());
topicStatsStream.writePair("totalNonContiguousDeletedMessagesRange", subscription.getTotalNonContiguousDeletedMessagesRange());
topicStatsStream.writePair("type", subscription.getTypeString());
- if (SubType.Shared.equals(subscription.getType())) {
+ if (Subscription.isIndividualAckMode(subscription.getType())) {
if(subscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) {
PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers)subscription.getDispatcher();
topicStatsStream.writePair("blockedSubscriptionOnUnackedMsgs", dispatcher.isBlockedDispatcherOnUnackedMsgs());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index c5f42bb..9c67817 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -198,18 +198,12 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
receiveAndCheck(checkList);
+ // wait for consumer grouping acking send.
+ Thread.sleep(1000);
+
consumer1.close();
consumer2.close();
- // avoid message replay
- Message<Integer> message;
- do {
- message = consumer3.receive(1000, TimeUnit.MILLISECONDS);
- if (message != null) {
- consumer3.acknowledge(message);
- }
- } while (message != null);
-
for (int i = 0; i < 10; i++) {
for (String key : keys) {
producer.newMessage()