You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2013/05/15 14:31:10 UTC

svn commit: r1482790 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/advisory/ activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ activemq-broker/src/main/java/org/apache/activemq/broker/region/ activemq-unit-tests/sr...

Author: dejanb
Date: Wed May 15 12:31:09 2013
New Revision: 1482790

URL: http://svn.apache.org/r1482790
Log:
https://issues.apache.org/jira/browse/AMQ-4000 - as part of this feature we need to properly send advisories when durable sub unregisters

Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=1482790&r1=1482789&r2=1482790&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java Wed May 15 12:31:09 2013
@@ -25,29 +25,14 @@ import org.apache.activemq.broker.Broker
 import org.apache.activemq.broker.BrokerFilter;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
-import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.MessageReference;
-import org.apache.activemq.broker.region.Subscription;
-import org.apache.activemq.broker.region.TopicSubscription;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.BrokerInfo;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.DestinationInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.broker.region.*;
+import org.apache.activemq.command.*;
 import org.apache.activemq.security.SecurityContext;
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.usage.Usage;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.LongSequenceGenerator;
+import org.apache.activemq.util.SubscriptionKey;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -264,6 +249,29 @@ public class AdvisoryBroker extends Brok
     }
 
     @Override
+    public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
+        SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
+
+        DurableTopicSubscription sub = ((TopicRegion)((RegionBroker)next).getTopicRegion()).getDurableSubscription(key);
+
+        if (sub == null) {
+            LOG.warn("We cannot send an advisory message for a durable sub removal when we don't know about the durable sub");
+            return;
+        }
+
+        ActiveMQDestination dest = sub.getConsumerInfo().getDestination();
+
+        super.removeSubscription(context, info);
+
+        // Don't advise advisory topics.
+        if (!AdvisorySupport.isAdvisoryTopic(dest)) {
+            ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest);
+            fireConsumerAdvisory(context, dest, topic, info);
+        }
+
+    }
+
+    @Override
     public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
         super.removeProducer(context, info);
 

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=1482790&r1=1482789&r2=1482790&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java Wed May 15 12:31:09 2013
@@ -405,7 +405,7 @@ public class BrokerView implements Broke
         ConnectionContext context = new ConnectionContext();
         context.setBroker(safeGetBroker());
         context.setClientId(clientId);
-        safeGetBroker().removeSubscription(context, info);
+        brokerService.getBroker().removeSubscription(context, info);
     }
 
     //  doc comment inherited from BrokerViewMBean

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java?rev=1482790&r1=1482789&r2=1482790&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java Wed May 15 12:31:09 2013
@@ -20,6 +20,7 @@ import javax.management.openmbean.Compos
 import javax.management.openmbean.OpenDataException;
 import javax.management.openmbean.TabularData;
 
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.DurableTopicSubscription;
 import org.apache.activemq.broker.region.Subscription;
@@ -31,6 +32,7 @@ import org.apache.activemq.command.Remov
 public class DurableSubscriptionView extends SubscriptionView implements DurableSubscriptionViewMBean {
 
     protected ManagedRegionBroker broker;
+    protected BrokerService brokerService;
     protected String subscriptionName;
     protected DurableTopicSubscription durableSub;
 
@@ -40,9 +42,10 @@ public class DurableSubscriptionView ext
      * @param clientId
      * @param sub
      */
-    public DurableSubscriptionView(ManagedRegionBroker broker, String clientId, String userName, Subscription sub) {
+    public DurableSubscriptionView(ManagedRegionBroker broker, BrokerService brokerService, String clientId, String userName, Subscription sub) {
         super(clientId, userName, sub);
         this.broker = broker;
+        this.brokerService = brokerService;
         this.durableSub=(DurableTopicSubscription) sub;
         if (sub != null) {
             this.subscriptionName = sub.getConsumerInfo().getSubscriptionName();
@@ -87,7 +90,7 @@ public class DurableSubscriptionView ext
         ConnectionContext context = new ConnectionContext();
         context.setBroker(broker);
         context.setClientId(clientId);
-        broker.removeSubscription(context, info);
+        brokerService.getBroker().removeSubscription(context, info);
     }
 
     public String toString() {

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java?rev=1482790&r1=1482789&r2=1482790&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java Wed May 15 12:31:09 2013
@@ -20,6 +20,7 @@ import javax.management.openmbean.Compos
 import javax.management.openmbean.OpenDataException;
 import javax.management.openmbean.TabularData;
 
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.command.ConsumerInfo;
@@ -41,8 +42,8 @@ public class InactiveDurableSubscription
      * @param userName
      * @param subInfo
      */
-    public InactiveDurableSubscriptionView(ManagedRegionBroker broker, String clientId, SubscriptionInfo subInfo, Subscription subscription) {
-        super(broker, clientId, null, subscription);
+    public InactiveDurableSubscriptionView(ManagedRegionBroker broker, BrokerService brokerService, String clientId, SubscriptionInfo subInfo, Subscription subscription) {
+        super(broker, brokerService, clientId, null, subscription);
         this.broker = broker;
         this.subscriptionInfo = subInfo;
     }
@@ -134,7 +135,7 @@ public class InactiveDurableSubscription
         ConnectionContext context = new ConnectionContext();
         context.setBroker(broker);
         context.setClientId(clientId);
-        broker.removeSubscription(context, info);
+        brokerService.getBroker().removeSubscription(context, info);
     }
 
     public String toString() {

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=1482790&r1=1482789&r2=1482790&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Wed May 15 12:31:09 2013
@@ -206,7 +206,7 @@ public class ManagedRegionBroker extends
             } else {
                 String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null;
                 if (sub.getConsumerInfo().isDurable()) {
-                    view = new DurableSubscriptionView(this, context.getClientId(), userName, sub);
+                    view = new DurableSubscriptionView(this, brokerService, context.getClientId(), userName, sub);
                 } else {
                     if (sub instanceof TopicSubscription) {
                         view = new TopicSubscriptionView(context.getClientId(), userName, (TopicSubscription) sub);
@@ -509,7 +509,7 @@ public class ManagedRegionBroker extends
         try {
             ConsumerInfo offlineConsumerInfo = subscription != null ? subscription.getConsumerInfo() : ((TopicRegion)getTopicRegion()).createInactiveConsumerInfo(info);
             ObjectName objectName = BrokerMBeanSupport.createSubscriptionName(brokerObjectName, info.getClientId(), offlineConsumerInfo);
-            SubscriptionView view = new InactiveDurableSubscriptionView(this, key.getClientId(), info, subscription);
+            SubscriptionView view = new InactiveDurableSubscriptionView(this, brokerService, key.getClientId(), info, subscription);
 
             try {
                 AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1482790&r1=1482789&r2=1482790&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Wed May 15 12:31:09 2013
@@ -411,6 +411,7 @@ public class RegionBroker extends EmptyB
             topicRegion.removeSubscription(context, info);
         } finally {
             inactiveDestinationsPurgeLock.readLock().unlock();
+
         }
     }
 

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=1482790&r1=1482789&r2=1482790&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Wed May 15 12:31:09 2013
@@ -378,4 +378,8 @@ public class TopicRegion extends Abstrac
     public boolean durableSubscriptionExists(SubscriptionKey key) {
         return this.durableSubscriptions.containsKey(key);
     }
+
+    public DurableTopicSubscription getDurableSubscription(SubscriptionKey key) {
+        return durableSubscriptions.get(key);
+    }
 }

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java?rev=1482790&r1=1482789&r2=1482790&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java Wed May 15 12:31:09 2013
@@ -18,15 +18,16 @@ package org.apache.activemq.usecases;
 
 import java.io.File;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
-import javax.jms.Connection;
-import javax.jms.Session;
+import javax.jms.*;
 import javax.management.InstanceNotFoundException;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.TestSupport;
+import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.BrokerFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
@@ -34,6 +35,7 @@ import org.apache.activemq.broker.jmx.Du
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DurableTopicSubscription;
 import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
@@ -70,12 +72,14 @@ public class DurableSubscriptionUnsubscr
 
     public void doJMXUnsubscribe(boolean restart) throws Exception {
         createSubscriptions();
+        createAdvisorySubscription();
 
         Thread.sleep(1000);
         assertCount(100, 0);
 
         if (restart) {
             restartBroker();
+            createAdvisorySubscription();
             assertCount(100, 0);
         }
 
@@ -97,12 +101,14 @@ public class DurableSubscriptionUnsubscr
 
         if (restart) {
             restartBroker();
+            createAdvisorySubscription();
             assertCount(0, 0);
         }
     }
 
     public void doConnectionUnsubscribe(boolean restart) throws Exception {
         createSubscriptions();
+        createAdvisorySubscription();
 
         Thread.sleep(1000);
         assertCount(100, 0);
@@ -131,6 +137,7 @@ public class DurableSubscriptionUnsubscr
 
         if (restart) {
             restartBroker();
+            createAdvisorySubscription();
             assertCount(100, 0);
         }
 
@@ -150,18 +157,21 @@ public class DurableSubscriptionUnsubscr
 
         if (restart) {
             restartBroker();
+            createAdvisorySubscription();
             assertCount(0, 0);
         }
     }
 
     public void doDirectUnsubscribe(boolean restart) throws Exception {
         createSubscriptions();
+        createAdvisorySubscription();
 
         Thread.sleep(1000);
         assertCount(100, 0);
 
         if (restart) {
             restartBroker();
+            createAdvisorySubscription();
             assertCount(100, 0);
         }
 
@@ -172,9 +182,10 @@ public class DurableSubscriptionUnsubscr
             ConnectionContext context = new ConnectionContext();
             context.setBroker(broker.getRegionBroker());
             context.setClientId(getName());
-            broker.getRegionBroker().removeSubscription(context, info);
+            broker.getBroker().removeSubscription(context, info);
 
             if (i % 20 == 0) {
+                Thread.sleep(1000);
                 assertCount(100 - i - 1, 0);
             }
         }
@@ -183,6 +194,7 @@ public class DurableSubscriptionUnsubscr
 
         if (restart) {
             restartBroker();
+            createAdvisorySubscription();
             assertCount(0, 0);
         }
     }
@@ -195,6 +207,20 @@ public class DurableSubscriptionUnsubscr
         }
     }
 
+    private final AtomicInteger advisories = new AtomicInteger(0);
+
+    private void createAdvisorySubscription() throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer advisoryConsumer = session.createConsumer(AdvisorySupport.getConsumerAdvisoryTopic(topic));
+        advisoryConsumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                if (((ActiveMQMessage)message).getDataStructure() instanceof RemoveSubscriptionInfo) {
+                    advisories.incrementAndGet();
+                }
+            }
+        });
+    }
 
     private void assertCount(int all, int active) throws Exception {
         int inactive = all - active;
@@ -224,6 +250,9 @@ public class DurableSubscriptionUnsubscr
         // check the strange false MBean
         if (all == 0)
             assertEquals(0, countMBean());
+
+        // check if we got all advisories
+        assertEquals(100, all + advisories.get());
     }
 
     private int countMBean() throws MalformedObjectNameException, InstanceNotFoundException {