You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/31 15:45:26 UTC

[pulsar] branch master updated: Fix remove pending acks in Key_Shared subscription (#4407)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4464dbb  Fix remove pending acks in Key_Shared subscription (#4407)
4464dbb is described below

commit 4464dbb2bc6efdcc8fda806dce74f30640c3153e
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri May 31 23:45:21 2019 +0800

    Fix remove pending acks in Key_Shared subscription (#4407)
    
    * Fix remove pending acks in key_shared subscription.
    
    * Fix stats in key_shared subscription.
    
    * Add subscription utils to check isCumulativeAckMode or isIndividualAckMode.
    
    * Use Subscription.isIndividualAckMode check in subscription stats
---
 .../main/java/org/apache/pulsar/broker/service/Consumer.java |  8 ++++----
 .../java/org/apache/pulsar/broker/service/ServerCnx.java     |  2 +-
 .../org/apache/pulsar/broker/service/StreamingStats.java     |  2 +-
 .../java/org/apache/pulsar/broker/service/Subscription.java  |  9 +++++++++
 .../broker/service/persistent/PersistentSubscription.java    |  2 +-
 .../pulsar/broker/service/persistent/PersistentTopic.java    |  2 +-
 .../apache/pulsar/client/api/KeySharedSubscriptionTest.java  | 12 +++---------
 7 files changed, 20 insertions(+), 17 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index e2c7fe1..840a0ca 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -138,7 +138,7 @@ public class Consumer {
         stats.setClientVersion(cnx.getClientVersion());
         stats.metadata = this.metadata;
 
-        if (subType == SubType.Shared || subType == SubType.Key_Shared) {
+        if (Subscription.isIndividualAckMode(subType)) {
             this.pendingAcks = new ConcurrentLongLongPairHashMap(256, 1);
         } else {
             // We don't need to keep track of pending acks if the subscription is not shared
@@ -334,7 +334,7 @@ public class Consumer {
                 return;
             }
 
-            if (subType == SubType.Shared) {
+            if (Subscription.isIndividualAckMode(subType)) {
                 log.warn("[{}] [{}] Received cumulative ack on shared subscription, ignoring", subscription, consumerId);
                 return;
             }
@@ -350,7 +350,7 @@ public class Consumer {
                 PositionImpl position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
                 positionsAcked.add(position);
 
-                if (subType == SubType.Shared) {
+                if (Subscription.isIndividualAckMode(subType)) {
                     removePendingAcks(position);
                 }
 
@@ -424,7 +424,7 @@ public class Consumer {
      * @return
      */
     private boolean shouldBlockConsumerOnUnackMsgs() {
-        return SubType.Shared.equals(subType) && maxUnackedMessages > 0;
+        return Subscription.isIndividualAckMode(subType) && maxUnackedMessages > 0;
     }
 
     public void updateRates() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index ddb619f..9447af1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1104,7 +1104,7 @@ public class ServerCnx extends PulsarHandler {
 
         if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
             Consumer consumer = consumerFuture.getNow(null);
-            if (redeliver.getMessageIdsCount() > 0 && (consumer.subType() == SubType.Shared || consumer.subType() == SubType.Key_Shared)) {
+            if (redeliver.getMessageIdsCount() > 0 && Subscription.isIndividualAckMode(consumer.subType())) {
                 consumer.redeliverUnacknowledgedMessages(redeliver.getMessageIdsList());
             } else {
                 consumer.redeliverUnacknowledgedMessages();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StreamingStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StreamingStats.java
index a5614e8..892cdca 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StreamingStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StreamingStats.java
@@ -65,7 +65,7 @@ public class StreamingStats {
         statsStream.writePair("msgThroughputOut", stats.msgThroughputOut);
         statsStream.writePair("msgRateRedeliver", stats.msgRateRedeliver);
 
-        if (PulsarApi.CommandSubscribe.SubType.Shared.equals(subType)) {
+        if (Subscription.isIndividualAckMode(subType)) {
             statsStream.writePair("unackedMessages", stats.unackedMessages);
             statsStream.writePair("blockedConsumerOnUnackedMsgs", stats.blockedConsumerOnUnackedMsgs);
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
index 345ef0b..4851b5b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
@@ -89,4 +89,13 @@ public interface Subscription {
     String getTypeString();
 
     void addUnAckedMessages(int unAckMessages);
+
+    // Subscription utils
+    static boolean isCumulativeAckMode(SubType subType) {
+        return SubType.Exclusive.equals(subType) || SubType.Failover.equals(subType);
+    }
+
+    static boolean isIndividualAckMode(SubType subType) {
+        return SubType.Shared.equals(subType) || SubType.Key_Shared.equals(subType);
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 8cb97c5..42a1479 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -670,7 +670,7 @@ public class PersistentSubscription implements Subscription {
                 subStats.activeConsumerName = activeConsumer.consumerName();
             }
         }
-        if (SubType.Shared.equals(subStats.type)) {
+        if (Subscription.isIndividualAckMode(subStats.type)) {
             if (dispatcher instanceof PersistentDispatcherMultipleConsumers) {
                 PersistentDispatcherMultipleConsumers d = (PersistentDispatcherMultipleConsumers) dispatcher;
                 subStats.unackedMessages = d.getTotalUnackedMessages();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 22650d3..f60302e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1395,7 +1395,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
                 topicStatsStream.writePair("numberOfEntriesSinceFirstNotAckedMessage", subscription.getNumberOfEntriesSinceFirstNotAckedMessage());
                 topicStatsStream.writePair("totalNonContiguousDeletedMessagesRange", subscription.getTotalNonContiguousDeletedMessagesRange());
                 topicStatsStream.writePair("type", subscription.getTypeString());
-                if (SubType.Shared.equals(subscription.getType())) {
+                if (Subscription.isIndividualAckMode(subscription.getType())) {
                     if(subscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) {
                         PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers)subscription.getDispatcher();
                         topicStatsStream.writePair("blockedSubscriptionOnUnackedMsgs",  dispatcher.isBlockedDispatcherOnUnackedMsgs());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index c5f42bb..9c67817 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -198,18 +198,12 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
 
         receiveAndCheck(checkList);
 
+        // wait for consumer grouping acking send.
+        Thread.sleep(1000);
+
         consumer1.close();
         consumer2.close();
 
-        // avoid message replay
-        Message<Integer> message;
-        do {
-            message = consumer3.receive(1000, TimeUnit.MILLISECONDS);
-            if (message != null) {
-                consumer3.acknowledge(message);
-            }
-        } while (message != null);
-
         for (int i = 0; i < 10; i++) {
             for (String key : keys) {
                 producer.newMessage()