You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2014/05/26 11:07:38 UTC

[11/13] git commit: Fixed AMQ-5160, restored previous DurableSubscription behaviour of only recovering messages when cursor is empty, retained messages are always recovered

Fixed AMQ-5160, restored previous DurableSubscription behaviour of only recovering messages when cursor is empty, retained messages are always recovered


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/42ad1039
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/42ad1039
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/42ad1039

Branch: refs/heads/trunk
Commit: 42ad1039cb51b47a16f46f3d8f0fe8bf36ffdd1d
Parents: 8644090
Author: Dhiraj Bokde <dh...@yahoo.com>
Authored: Tue May 13 13:15:04 2014 -0700
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Mon May 26 11:07:19 2014 +0200

----------------------------------------------------------------------
 .../activemq/broker/region/DurableTopicSubscription.java  |  5 +++++
 .../policy/RetainedMessageSubscriptionRecoveryPolicy.java | 10 +++++++++-
 2 files changed, 14 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/42ad1039/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
index c82e6ef..e61a608 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
@@ -137,6 +137,11 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
         dispatchPending();
     }
 
+    // used by RetaineMessageSubscriptionRecoveryPolicy
+    public boolean isEmpty(Topic topic) {
+        return pending.isEmpty(topic);
+    }
+
     public void activate(SystemUsage memoryManager, ConnectionContext context, ConsumerInfo info, RegionBroker regionBroker) throws Exception {
         if (!active.get()) {
             this.context = context;

http://git-wip-us.apache.org/repos/asf/activemq/blob/42ad1039/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
index f1573fa..ea07c8b 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.SubscriptionRecovery;
 import org.apache.activemq.broker.region.Topic;
@@ -74,7 +75,14 @@ public class RetainedMessageSubscriptionRecoveryPolicy implements SubscriptionRe
             sub.addRecoveredMessage(context, retainedMessage);
         }
         if (wrapped != null) {
-            wrapped.recover(context, topic, sub);
+            // retain default ActiveMQ behaviour of recovering messages only for empty durable subscriptions
+            boolean recover = true;
+            if (sub instanceof DurableTopicSubscription && !((DurableTopicSubscription)sub).isEmpty(topic)) {
+                recover = false;
+            }
+            if (recover) {
+                wrapped.recover(context, topic, sub);
+            }
         }
     }