You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2014/05/26 11:07:35 UTC
[08/13] git commit: Fixed AMQ-5160,
force durable subscriptions to always recover retroactive messages
Fixed AMQ-5160, force durable subscriptions to always recover retroactive messages
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8947a09e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8947a09e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8947a09e
Branch: refs/heads/trunk
Commit: 8947a09eaa075cb8f5e86599404198e0a5c91910
Parents: b36adff
Author: Dhiraj Bokde <dh...@yahoo.com>
Authored: Tue May 13 00:29:03 2014 -0700
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Mon May 26 11:07:19 2014 +0200
----------------------------------------------------------------------
.../broker/region/DurableTopicSubscription.java | 16 ++++++----------
1 file changed, 6 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/8947a09e/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
index 8cb6ecc..6501e58 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
@@ -120,9 +120,8 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
if (active.get() || keepDurableSubsActive) {
Topic topic = (Topic) destination;
topic.activate(context, this);
- if (pending.isEmpty(topic)) {
- topic.recoverRetroactiveMessages(context, this);
- }
+ // always use the recovery policy
+ topic.recoverRetroactiveMessages(context, this);
this.enqueueCounter += pending.size();
} else if (destination.getMessageStore() != null) {
TopicMessageStore store = (TopicMessageStore) destination.getMessageStore();
@@ -167,13 +166,10 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
pending.setMaxAuditDepth(getMaxAuditDepth());
pending.setMaxProducersToAudit(getMaxProducersToAudit());
pending.start();
- // If nothing was in the persistent store, then try to use the
- // recovery policy.
- if (pending.isEmpty()) {
- for (Destination destination : durableDestinations.values()) {
- Topic topic = (Topic) destination;
- topic.recoverRetroactiveMessages(context, this);
- }
+ // always use the recovery policy.
+ for (Destination destination : durableDestinations.values()) {
+ Topic topic = (Topic) destination;
+ topic.recoverRetroactiveMessages(context, this);
}
}
}