You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2014/05/26 11:07:38 UTC
[11/13] git commit: Fixed AMQ-5160,
restored previous DurableSubscription behaviour of only recovering
messages when cursor is empty, retained messages are always recovered
Fixed AMQ-5160, restored previous DurableSubscription behaviour of only recovering messages when cursor is empty, retained messages are always recovered
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/42ad1039
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/42ad1039
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/42ad1039
Branch: refs/heads/trunk
Commit: 42ad1039cb51b47a16f46f3d8f0fe8bf36ffdd1d
Parents: 8644090
Author: Dhiraj Bokde <dh...@yahoo.com>
Authored: Tue May 13 13:15:04 2014 -0700
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Mon May 26 11:07:19 2014 +0200
----------------------------------------------------------------------
.../activemq/broker/region/DurableTopicSubscription.java | 5 +++++
.../policy/RetainedMessageSubscriptionRecoveryPolicy.java | 10 +++++++++-
2 files changed, 14 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/42ad1039/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
index c82e6ef..e61a608 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
@@ -137,6 +137,11 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
dispatchPending();
}
+ // used by RetaineMessageSubscriptionRecoveryPolicy
+ public boolean isEmpty(Topic topic) {
+ return pending.isEmpty(topic);
+ }
+
public void activate(SystemUsage memoryManager, ConnectionContext context, ConsumerInfo info, RegionBroker regionBroker) throws Exception {
if (!active.get()) {
this.context = context;
http://git-wip-us.apache.org/repos/asf/activemq/blob/42ad1039/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
index f1573fa..ea07c8b 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.SubscriptionRecovery;
import org.apache.activemq.broker.region.Topic;
@@ -74,7 +75,14 @@ public class RetainedMessageSubscriptionRecoveryPolicy implements SubscriptionRe
sub.addRecoveredMessage(context, retainedMessage);
}
if (wrapped != null) {
- wrapped.recover(context, topic, sub);
+ // retain default ActiveMQ behaviour of recovering messages only for empty durable subscriptions
+ boolean recover = true;
+ if (sub instanceof DurableTopicSubscription && !((DurableTopicSubscription)sub).isEmpty(topic)) {
+ recover = false;
+ }
+ if (recover) {
+ wrapped.recover(context, topic, sub);
+ }
}
}