You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2014/05/26 11:07:35 UTC

[08/13] git commit: Fixed AMQ-5160, force durable subscriptions to always recover retroactive messages

Fixed AMQ-5160, force durable subscriptions to always recover retroactive messages


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8947a09e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8947a09e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8947a09e

Branch: refs/heads/trunk
Commit: 8947a09eaa075cb8f5e86599404198e0a5c91910
Parents: b36adff
Author: Dhiraj Bokde <dh...@yahoo.com>
Authored: Tue May 13 00:29:03 2014 -0700
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Mon May 26 11:07:19 2014 +0200

----------------------------------------------------------------------
 .../broker/region/DurableTopicSubscription.java     | 16 ++++++----------
 1 file changed, 6 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/8947a09e/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
index 8cb6ecc..6501e58 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
@@ -120,9 +120,8 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
         if (active.get() || keepDurableSubsActive) {
             Topic topic = (Topic) destination;
             topic.activate(context, this);
-            if (pending.isEmpty(topic)) {
-                topic.recoverRetroactiveMessages(context, this);
-            }
+            // always use the recovery policy
+            topic.recoverRetroactiveMessages(context, this);
             this.enqueueCounter += pending.size();
         } else if (destination.getMessageStore() != null) {
             TopicMessageStore store = (TopicMessageStore) destination.getMessageStore();
@@ -167,13 +166,10 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
                     pending.setMaxAuditDepth(getMaxAuditDepth());
                     pending.setMaxProducersToAudit(getMaxProducersToAudit());
                     pending.start();
-                    // If nothing was in the persistent store, then try to use the
-                    // recovery policy.
-                    if (pending.isEmpty()) {
-                        for (Destination destination : durableDestinations.values()) {
-                            Topic topic = (Topic) destination;
-                            topic.recoverRetroactiveMessages(context, this);
-                        }
+                    // always use the recovery policy.
+                    for (Destination destination : durableDestinations.values()) {
+                        Topic topic = (Topic) destination;
+                        topic.recoverRetroactiveMessages(context, this);
                     }
                 }
             }