You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/11/14 23:43:16 UTC

svn commit: r1409489 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4062Test.java

Author: tabish
Date: Wed Nov 14 22:43:15 2012
New Revision: 1409489

URL: http://svn.apache.org/viewvc?rev=1409489&view=rev
Log:
fix for breakage caused by earlier fix to: https://issues.apache.org/jira/browse/AMQ-4062

Only reconfigure when keepDurableSubsActive=false

Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4062Test.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=1409489&r1=1409488&r2=1409489&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Wed Nov 14 22:43:15 2012
@@ -51,8 +51,8 @@ public class DurableTopicSubscription ex
     private final ConcurrentHashMap<ActiveMQDestination, Destination> durableDestinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
     private final SubscriptionKey subscriptionKey;
     private final boolean keepDurableSubsActive;
-    private AtomicBoolean active = new AtomicBoolean();
-    private AtomicLong offlineTimestamp = new AtomicLong(-1);
+    private final AtomicBoolean active = new AtomicBoolean();
+    private final AtomicLong offlineTimestamp = new AtomicLong(-1);
 
     public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
             throws JMSException {
@@ -76,10 +76,12 @@ public class DurableTopicSubscription ex
         offlineTimestamp.set(timestamp);
     }
 
+    @Override
     public boolean isFull() {
         return !active.get() || super.isFull();
     }
 
+    @Override
     public void gc() {
     }
 
@@ -87,6 +89,7 @@ public class DurableTopicSubscription ex
      * store will have a pending ack for all durables, irrespective of the
      * selector so we need to ack if node is un-matched
      */
+    @Override
     public void unmatched(MessageReference node) throws IOException {
         MessageAck ack = new MessageAck();
         ack.setAckType(MessageAck.UNMATCHED_ACK_TYPE);
@@ -100,6 +103,7 @@ public class DurableTopicSubscription ex
         // statically configured via maxPageSize
     }
 
+    @Override
     public void add(ConnectionContext context, Destination destination) throws Exception {
         if (!destinations.contains(destination)) {
             super.add(context, destination);
@@ -135,15 +139,6 @@ public class DurableTopicSubscription ex
             this.context = context;
             this.info = info;
 
-            // On Activation we should update the configuration based on our new consumer info.
-            ActiveMQDestination dest = this.info.getDestination();
-            if (dest != null && regionBroker.getDestinationPolicy() != null) {
-                PolicyEntry entry = regionBroker.getDestinationPolicy().getEntryFor(dest);
-                if (entry != null) {
-                    entry.configure(broker, usageManager, this);
-                }
-            }
-
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Activating " + this);
             }
@@ -153,7 +148,17 @@ public class DurableTopicSubscription ex
                     add(context, topic);
                     topic.activate(context, this);
                 }
+
+                // On Activation we should update the configuration based on our new consumer info.
+                ActiveMQDestination dest = this.info.getDestination();
+                if (dest != null && regionBroker.getDestinationPolicy() != null) {
+                    PolicyEntry entry = regionBroker.getDestinationPolicy().getEntryFor(dest);
+                    if (entry != null) {
+                        entry.configure(broker, usageManager, this);
+                    }
+                }
             }
+
             synchronized (pendingLock) {
                 pending.setSystemUsage(memoryManager);
                 pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
@@ -234,6 +239,7 @@ public class DurableTopicSubscription ex
         prefetchExtension.set(0);
     }
 
+    @Override
     protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
         MessageDispatch md = super.createMessageDispatch(node, message);
         if (node != QueueMessageReference.NULL_MESSAGE) {
@@ -245,6 +251,7 @@ public class DurableTopicSubscription ex
         return md;
     }
 
+    @Override
     public void add(MessageReference node) throws Exception {
         if (!active.get() && !keepDurableSubsActive) {
             return;
@@ -252,6 +259,7 @@ public class DurableTopicSubscription ex
         super.add(node);
     }
 
+    @Override
     protected void dispatchPending() throws IOException {
         if (isActive()) {
             super.dispatchPending();
@@ -262,12 +270,14 @@ public class DurableTopicSubscription ex
         pending.remove(node);
     }
 
+    @Override
     protected void doAddRecoveredMessage(MessageReference message) throws Exception {
         synchronized (pending) {
             pending.addRecoveredMessage(message);
         }
     }
 
+    @Override
     public int getPendingQueueSize() {
         if (active.get() || keepDurableSubsActive) {
             return super.getPendingQueueSize();
@@ -276,14 +286,17 @@ public class DurableTopicSubscription ex
         return 0;
     }
 
+    @Override
     public void setSelector(String selector) throws InvalidSelectorException {
         throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions");
     }
 
+    @Override
     protected boolean canDispatch(MessageReference node) {
         return isActive();
     }
 
+    @Override
     protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException {
         Destination regionDestination = (Destination) node.getRegionDestination();
         regionDestination.acknowledge(context, this, ack, node);
@@ -291,6 +304,7 @@ public class DurableTopicSubscription ex
         node.decrementReferenceCount();
     }
 
+    @Override
     public synchronized String toString() {
         return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations="
                 + durableDestinations.size() + ", total=" + enqueueCounter + ", pending=" + getPendingQueueSize() + ", dispatched=" + dispatchCounter
@@ -304,6 +318,7 @@ public class DurableTopicSubscription ex
     /**
      * Release any references that we are holding.
      */
+    @Override
     public void destroy() {
         synchronized (pendingLock) {
             try {
@@ -327,6 +342,7 @@ public class DurableTopicSubscription ex
         setSlowConsumer(false);
     }
 
+    @Override
     public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
         if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
             try {
@@ -337,6 +353,7 @@ public class DurableTopicSubscription ex
         }
     }
 
+    @Override
     protected boolean isDropped(MessageReference node) {
         return false;
     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4062Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4062Test.java?rev=1409489&r1=1409488&r2=1409489&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4062Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4062Test.java Wed Nov 14 22:43:15 2012
@@ -96,6 +96,7 @@ public class AMQ4062Test {
         service=new BrokerService();
         service.setPersistent(true);
         service.setUseJmx(false);
+        service.setKeepDurableSubsActive(false);
 
         KahaDBPersistenceAdapter pa=new KahaDBPersistenceAdapter();
         File dataFile=new File("createData");