You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2014/05/26 11:07:33 UTC

[06/13] git commit: Fixed AMQ-5160, changed DurableTopicSubscription to only recover retroactive messages for retroactive topics or consumers

Fixed AMQ-5160, changed DurableTopicSubscription to only recover retroactive messages for retroactive topics or consumers


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/78950ec5
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/78950ec5
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/78950ec5

Branch: refs/heads/trunk
Commit: 78950ec596377fc3417fc99ba67b03ec10e89cf9
Parents: 70f7c58
Author: Dhiraj Bokde <dh...@yahoo.com>
Authored: Tue May 13 12:14:21 2014 -0700
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Mon May 26 11:07:19 2014 +0200

----------------------------------------------------------------------
 .../activemq/broker/region/DurableTopicSubscription.java | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/78950ec5/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
index 6501e58..c82e6ef 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
@@ -120,8 +120,9 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
         if (active.get() || keepDurableSubsActive) {
             Topic topic = (Topic) destination;
             topic.activate(context, this);
-            // always use the recovery policy
-            topic.recoverRetroactiveMessages(context, this);
+            if (topic.isAlwaysRetroactive() || info.isRetroactive()) {
+                topic.recoverRetroactiveMessages(context, this);
+            }
             this.enqueueCounter += pending.size();
         } else if (destination.getMessageStore() != null) {
             TopicMessageStore store = (TopicMessageStore) destination.getMessageStore();
@@ -166,10 +167,12 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
                     pending.setMaxAuditDepth(getMaxAuditDepth());
                     pending.setMaxProducersToAudit(getMaxProducersToAudit());
                     pending.start();
-                    // always use the recovery policy.
+                    // use recovery policy for retroactive topics and consumers
                     for (Destination destination : durableDestinations.values()) {
                         Topic topic = (Topic) destination;
-                        topic.recoverRetroactiveMessages(context, this);
+                        if (topic.isAlwaysRetroactive() || info.isRetroactive()) {
+                            topic.recoverRetroactiveMessages(context, this);
+                        }
                     }
                 }
             }