You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2010/11/03 16:06:21 UTC

svn commit: r1030490 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/broker/jmx/ activemq-core/src/main/java/org/apache/activemq/broker/region/ activemq-core/src/test/java/org/apache/activemq/broker/jmx/ activemq-core/src/test/jav...

Author: dejanb
Date: Wed Nov  3 15:06:21 2010
New Revision: 1030490

URL: http://svn.apache.org/viewvc?rev=1030490&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-3013 - durable subs and jmx

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableUnsubscribeTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ManagedDurableSubscriptionTest.java
    activemq/trunk/activemq-web-console/src/main/webapp/subscribers.jsp
    activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacade.java
    activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacadeSupport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=1030490&r1=1030489&r2=1030490&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Wed Nov  3 15:06:21 2010
@@ -179,21 +179,29 @@ public class ManagedRegionBroker extends
         String connectionClientId = context.getClientId();
         ObjectName brokerJmxObjectName = brokerObjectName;
         String objectNameStr = getSubscriptionObjectName(sub, connectionClientId, brokerJmxObjectName);
-
         SubscriptionKey key = new SubscriptionKey(context.getClientId(), sub.getConsumerInfo().getSubscriptionName());
         try {
             ObjectName objectName = new ObjectName(objectNameStr);
             SubscriptionView view;
-            if (sub.getConsumerInfo().isDurable()) {
-                view = new DurableSubscriptionView(this, context.getClientId(), sub);
+            if (sub.getConsumerInfo().getConsumerId().getConnectionId().equals("OFFLINE")) {
+                // add offline subscribers to inactive list
+                SubscriptionInfo info = new SubscriptionInfo();
+                info.setClientId(context.getClientId());
+                info.setSubscriptionName(sub.getConsumerInfo().getSubscriptionName());
+                info.setDestination(sub.getConsumerInfo().getDestination());
+                addInactiveSubscription(key, info);
             } else {
-                if (sub instanceof TopicSubscription) {
-                    view = new TopicSubscriptionView(context.getClientId(), (TopicSubscription)sub);
+                if (sub.getConsumerInfo().isDurable()) {
+                    view = new DurableSubscriptionView(this, context.getClientId(), sub);
                 } else {
-                    view = new SubscriptionView(context.getClientId(), sub);
+                    if (sub instanceof TopicSubscription) {
+                        view = new TopicSubscriptionView(context.getClientId(), (TopicSubscription) sub);
+                    } else {
+                        view = new SubscriptionView(context.getClientId(), sub);
+                    }
                 }
+                registerSubscription(objectName, sub.getConsumerInfo(), key, view);
             }
-            registerSubscription(objectName, sub.getConsumerInfo(), key, view);
             subscriptionMap.put(sub, objectName);
             return objectName;
         } catch (Exception e) {
@@ -227,11 +235,38 @@ public class ManagedRegionBroker extends
         return objectNameStr;
     }
 
+    @Override
+    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
+        Subscription sub = super.addConsumer(context, info);
+        SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
+        ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
+        if (inactiveName != null) {
+            // if it was inactive, register it
+            registerSubscription(context, sub);
+        }
+        return sub;
+    }
+
+    @Override
+    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
+        for (Subscription sub : subscriptionMap.keySet()) {
+            if (sub.getConsumerInfo().equals(info)) {
+               // unregister all consumer subs
+               unregisterSubscription(subscriptionMap.get(sub), true);
+            }
+        }
+        super.removeConsumer(context, info);
+    }
+
     public void unregisterSubscription(Subscription sub) {
         ObjectName name = subscriptionMap.remove(sub);
         if (name != null) {
             try {
-                unregisterSubscription(name);
+                SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
+                ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
+                if (inactiveName != null) {
+                    inactiveDurableTopicSubscribers.remove(inactiveName);
+                }
             } catch (Exception e) {
                 LOG.error("Failed to unregister subscription " + sub, e);
             }
@@ -337,10 +372,9 @@ public class ManagedRegionBroker extends
 
     }
 
-    protected void unregisterSubscription(ObjectName key) throws Exception {
+    protected void unregisterSubscription(ObjectName key, boolean addToInactive) throws Exception {
         queueSubscribers.remove(key);
         topicSubscribers.remove(key);
-        inactiveDurableTopicSubscribers.remove(key);
         temporaryQueueSubscribers.remove(key);
         temporaryTopicSubscribers.remove(key);
         if (registeredMBeans.remove(key)) {
@@ -355,11 +389,13 @@ public class ManagedRegionBroker extends
         if (view != null) {
             // need to put this back in the inactive list
             SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(), view.getSubscriptionName());
-            SubscriptionInfo info = new SubscriptionInfo();
-            info.setClientId(subscriptionKey.getClientId());
-            info.setSubscriptionName(subscriptionKey.getSubscriptionName());
-            info.setDestination(new ActiveMQTopic(view.getDestinationName()));
-            addInactiveSubscription(subscriptionKey, info);
+            if (addToInactive) {
+                SubscriptionInfo info = new SubscriptionInfo();
+                info.setClientId(subscriptionKey.getClientId());
+                info.setSubscriptionName(subscriptionKey.getSubscriptionName());
+                info.setDestination(new ActiveMQTopic(view.getDestinationName()));
+                addInactiveSubscription(subscriptionKey, info);
+            }
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=1030490&r1=1030489&r2=1030490&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Wed Nov  3 15:06:21 2010
@@ -138,7 +138,6 @@ public class TopicRegion extends Abstrac
             throw new JMSException("Durable consumer is in use");
         }
 
-        durableSubscriptions.remove(key);
         synchronized (destinationsMutex) {
             for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
             	Destination dest = iter.next();
@@ -149,7 +148,12 @@ public class TopicRegion extends Abstrac
             	}
             }
         }
-        super.removeConsumer(context, sub.getConsumerInfo());
+        if (subscriptions.get(sub.getConsumerInfo()) != null) {
+            super.removeConsumer(context, sub.getConsumerInfo());
+        } else {
+            // try destroying inactive subscriptions
+            destroySubscription(sub);
+        }
     }
 
     @Override
@@ -159,7 +163,6 @@ public class TopicRegion extends Abstrac
 
     @Override
     protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception {
-
         List<Subscription> rc = super.addSubscriptionsForDestination(context, dest);
         Set<Subscription> dupChecker = new HashSet<Subscription>(rc);
 
@@ -210,7 +213,6 @@ public class TopicRegion extends Abstrac
                 }
             }
         }
-
         return rc;
     }
 
@@ -250,6 +252,7 @@ public class TopicRegion extends Abstrac
             if (sub == null) {
                 
                 sub = new DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive);
+
                 if (destination != null && broker.getDestinationPolicy() != null) {
                     PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
                     if (entry != null) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java?rev=1030490&r1=1030489&r2=1030490&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java Wed Nov  3 15:06:21 2010
@@ -442,7 +442,7 @@ public class MBeanTest extends EmbeddedB
         String selector = null;
         ObjectName name1 = broker.createDurableSubscriber(clientID, "subscriber1", topicName, selector);
         broker.createDurableSubscriber(clientID, "subscriber2", topicName, selector);
-        assertEquals("Durable subscriber count", 2, broker.getDurableTopicSubscribers().length);
+        assertEquals("Durable subscriber count", 2, broker.getInactiveDurableTopicSubscribers().length);
 
         assertNotNull("Should have created an mbean name for the durable subscriber!", name1);
 
@@ -450,7 +450,7 @@ public class MBeanTest extends EmbeddedB
 
         // now lets try destroy it
         broker.destroyDurableSubscriber(clientID, "subscriber1");
-        assertEquals("Durable subscriber count", 1, broker.getDurableTopicSubscribers().length);
+        assertEquals("Durable subscriber count", 1, broker.getInactiveDurableTopicSubscribers().length);
     }
 
     protected void assertConsumerCounts() throws Exception {

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java?rev=1030490&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java Wed Nov  3 15:06:21 2010
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.io.File;
+import java.lang.management.ManagementFactory;
+
+public class DurableSubscriptionUnsubscribeTest extends TestSupport {
+
+    BrokerService broker = null;
+    Connection connection = null;
+    ActiveMQTopic topic;
+
+    public void testJMXSubscriptionUnsubscribe() throws Exception {
+        doJMXUnsubscribe(false);
+    }
+
+    public void testJMXSubscriptionUnsubscribeWithRestart() throws Exception {
+        doJMXUnsubscribe(true);
+    }
+
+    public void testConnectionSubscriptionUnsubscribe() throws Exception {
+        doConnectionUnsubscribe(false);
+    }
+
+    public void testConnectionSubscriptionUnsubscribeWithRestart() throws Exception {
+        doConnectionUnsubscribe(true);
+    }
+
+    public void testDirectSubscriptionUnsubscribe() throws Exception {
+        doDirectUnsubscribe(false);
+    }
+
+    public void testDirectubscriptionUnsubscribeWithRestart() throws Exception {
+        doDirectUnsubscribe(true);
+    }
+
+    public void doJMXUnsubscribe(boolean restart) throws Exception {
+        for (int i = 0; i < 100; i++) {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            session.createDurableSubscriber(topic, "SubsId" + i);
+            session.close();
+        }
+
+        Thread.sleep(2 * 1000);
+
+        if (restart) {
+            stopBroker();
+            startBroker(false);
+        }
+
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        ObjectName[] subscriptions = broker.getAdminView().getDurableTopicSubscribers();
+        ObjectName[] inactive = broker.getAdminView().getInactiveDurableTopicSubscribers();
+
+        for (ObjectName subscription: subscriptions) {
+            mbs.invoke(subscription, "destroy", null, null);
+        }
+        for (ObjectName subscription: inactive) {
+            mbs.invoke(subscription, "destroy", null, null);
+        }
+
+        Thread.sleep(2 * 1000);
+
+        subscriptions = broker.getAdminView().getDurableTopicSubscribers();
+        assertEquals(0, subscriptions.length);
+
+        subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
+        assertEquals(0, subscriptions.length);
+    }
+
+    public void testInactiveSubscriptions() throws Exception {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            session.createDurableSubscriber(topic, "SubsId");
+
+            ObjectName[] subscriptions = broker.getAdminView().getDurableTopicSubscribers();
+            assertEquals(1, subscriptions.length);
+
+            subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
+            assertEquals(0, subscriptions.length);
+
+            session.close();
+
+            Thread.sleep(1000);
+
+            subscriptions = broker.getAdminView().getDurableTopicSubscribers();
+            assertEquals(0, subscriptions.length);
+
+            subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
+            assertEquals(1, subscriptions.length);
+
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            session.createDurableSubscriber(topic, "SubsId");
+
+            Thread.sleep(1000);
+
+            subscriptions = broker.getAdminView().getDurableTopicSubscribers();
+            assertEquals(1, subscriptions.length);
+
+            subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
+            assertEquals(0, subscriptions.length);
+
+            session.close();
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            Thread.sleep(1000);
+
+            subscriptions = broker.getAdminView().getDurableTopicSubscribers();
+            assertEquals(0, subscriptions.length);
+
+            subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
+            assertEquals(1, subscriptions.length);
+
+            session.unsubscribe("SubsId");
+
+            Thread.sleep(1000);
+
+            subscriptions = broker.getAdminView().getDurableTopicSubscribers();
+            assertEquals(0, subscriptions.length);
+
+            subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
+            assertEquals(0, subscriptions.length);
+
+            session.close();
+
+    }
+
+    public void doConnectionUnsubscribe(boolean restart) throws Exception {
+        for (int i = 0; i < 100; i++) {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            session.createDurableSubscriber(topic, "SubsId" + i);
+            session.close();
+        }
+
+        Thread.sleep(2 * 1000);
+
+        if (restart) {
+            stopBroker();
+            startBroker(false);
+        }
+
+        ObjectName[] subscriptions = broker.getAdminView().getDurableTopicSubscribers();
+        assertEquals(0, subscriptions.length);
+
+        subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
+        assertEquals(100, subscriptions.length);
+
+        for (int i = 0; i < 100; i++) {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            session.unsubscribe("SubsId" + i);
+            session.close();
+        }
+
+        Thread.sleep(2 * 1000);
+
+        subscriptions = broker.getAdminView().getDurableTopicSubscribers();
+        assertEquals(0, subscriptions.length);
+
+        subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
+        assertEquals(0, subscriptions.length);
+    }
+
+    public void doDirectUnsubscribe(boolean restart) throws Exception {
+        for (int i = 0; i < 100; i++) {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            session.createDurableSubscriber(topic, "SubsId" + i);
+            session.close();
+        }
+
+        Thread.sleep(2 * 1000);
+
+        if (restart) {
+            stopBroker();
+            startBroker(false);
+        }
+
+        for (int i = 0; i < 100; i++) {
+            RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
+            info.setClientId(getName());
+            info.setSubscriptionName("SubsId" + i);
+            ConnectionContext context = new ConnectionContext();
+            context.setBroker(broker.getRegionBroker());
+            context.setClientId(getName());
+            broker.getRegionBroker().removeSubscription(context, info);
+        }
+
+        Thread.sleep(2 * 1000);
+
+        ObjectName[] subscriptions = broker.getAdminView().getDurableTopicSubscribers();
+        assertEquals(0, subscriptions.length);
+
+        subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
+        assertEquals(0, subscriptions.length);
+    }
+
+    private void startBroker(boolean deleteMessages) throws Exception {
+        broker = BrokerFactory.createBroker("broker:(vm://localhost)");
+        broker.setUseJmx(true);
+        broker.setBrokerName(getName());
+
+        broker.setPersistent(true);
+        KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
+        persistenceAdapter.setDirectory(new File("activemq-data/" + getName()));
+        broker.setPersistenceAdapter(persistenceAdapter);
+        if (deleteMessages) {
+            broker.setDeleteAllMessagesOnStartup(true);
+        }
+
+        broker.start();
+
+        connection = createConnection();
+    }
+
+    private void stopBroker() throws Exception {
+        if (connection != null)
+            connection.close();
+        connection = null;
+
+        if (broker != null)
+            broker.stop();
+        broker = null;
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory("vm://" + getName() + "?waitForStart=5000&create=false");
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        topic = (ActiveMQTopic) createDestination();
+        startBroker(true);
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        stopBroker();
+        super.tearDown();
+    }
+
+    @Override
+    protected Connection createConnection() throws Exception {
+        Connection rc = super.createConnection();
+        rc.setClientID(getName());
+        rc.start();
+        return rc;
+    }
+}

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableUnsubscribeTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableUnsubscribeTest.java?rev=1030490&r1=1030489&r2=1030490&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableUnsubscribeTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableUnsubscribeTest.java Wed Nov  3 15:06:21 2010
@@ -63,7 +63,7 @@ public class DurableUnsubscribeTest exte
         assertEquals("Subscription is missing.", 1, d.getConsumers().size());
 
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        ObjectName[] subNames = broker.getAdminView().getDurableTopicSubscribers();
+        ObjectName[] subNames = broker.getAdminView().getInactiveDurableTopicSubscribers();
         mbs.invoke(subNames[0], "destroy", new Object[0], new String[0]);
 
         assertEquals("Subscription exists.", 0, d.getConsumers().size());

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ManagedDurableSubscriptionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ManagedDurableSubscriptionTest.java?rev=1030490&r1=1030489&r2=1030490&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ManagedDurableSubscriptionTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ManagedDurableSubscriptionTest.java Wed Nov  3 15:06:21 2010
@@ -47,24 +47,26 @@ public class ManagedDurableSubscriptionT
         startBroker();
 
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        ObjectName subscriptionObjectName = broker.getAdminView().getDurableTopicSubscribers()[0];
+        ObjectName inactiveSubscriptionObjectName = broker.getAdminView().getInactiveDurableTopicSubscribers()[0];
 
-        Object active = mbs.getAttribute(subscriptionObjectName, "Active");
-        assertTrue("Subscription is active.", Boolean.FALSE.equals(active));
+        Object inactive = mbs.getAttribute(inactiveSubscriptionObjectName, "Active");
+        assertTrue("Subscription is active.", Boolean.FALSE.equals(inactive));
 
         // activate
         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         session.createDurableSubscriber(topic, "SubsId");
 
-        active = mbs.getAttribute(subscriptionObjectName, "Active");
+        ObjectName activeSubscriptionObjectName = broker.getAdminView().getDurableTopicSubscribers()[0];
+
+        Object active = mbs.getAttribute(activeSubscriptionObjectName, "Active");
         assertTrue("Subscription is INactive.", Boolean.TRUE.equals(active));
 
         // deactivate
         connection.close();
         connection = null;
 
-        active = mbs.getAttribute(subscriptionObjectName, "Active");
-        assertTrue("Subscription is active.", Boolean.FALSE.equals(active));
+        inactive = mbs.getAttribute(inactiveSubscriptionObjectName, "Active");
+        assertTrue("Subscription is active.", Boolean.FALSE.equals(inactive));
 
     }
 

Modified: activemq/trunk/activemq-web-console/src/main/webapp/subscribers.jsp
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web-console/src/main/webapp/subscribers.jsp?rev=1030490&r1=1030489&r2=1030490&view=diff
==============================================================================
--- activemq/trunk/activemq-web-console/src/main/webapp/subscribers.jsp (original)
+++ activemq/trunk/activemq-web-console/src/main/webapp/subscribers.jsp Wed Nov  3 15:06:21 2010
@@ -70,7 +70,7 @@
 </form>
 
 
-<h2>Durable Topic Subscribers</h2>
+<h2>Active Durable Topic Subscribers</h2>
 
 
 <table id="topics" class="sortable autostripe">
@@ -107,6 +107,48 @@
 </td>
 </tr>
 </c:forEach>
+
+</tbody>
+</table>
+
+<h2>Offline Durable Topic Subscribers</h2>
+
+
+<table id="topics" class="sortable autostripe">
+<thead>
+<tr>
+<th>Client ID</th>
+<th>Subscription Name</th>
+<th>Connection ID</th>
+<th>Destination</th>
+<th>Selector</th>
+<th>Pending Queue Size</th>
+<th>Dispatched Queue Size</th>
+<th>Dispatched Counter</th>
+<th>Enqueue Counter</th>
+<th>Dequeue Counter</th>
+<th>Operations</th>
+</tr>
+</thead>
+<tbody>
+<c:forEach items="${requestContext.brokerQuery.inactiveDurableTopicSubscribers}" var="row">
+<tr>
+<td><form:tooltip text="${row.clientId}" length="10"/></td>
+<td><form:tooltip text="${row.subscriptionName}" length="10"/></td>
+<td><form:tooltip text="${row.connectionId}" length="10"/></td>
+<td><form:tooltip text="${row.destinationName}" length="10"/></td>
+<td>${row.selector}</td>
+<td>${row.pendingQueueSize}</td>
+<td>${row.dispatchedQueueSize}</td>
+<td>${row.dispachedCounter}</td>
+<td>${row.enqueueCounter}</td>
+<td>${row.dequeueCounter}</td>
+<td>
+    <a href="deleteSubscriber.action?clientId=${row.clientId}&subscriberName=${row.subscriptionName}&secret=<c:out value='${sessionScope["secret"]}'/>">Delete</a>
+</td>
+</tr>
+</c:forEach>
+
 </tbody>
 </table>
 

Modified: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacade.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacade.java?rev=1030490&r1=1030489&r2=1030490&view=diff
==============================================================================
--- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacade.java (original)
+++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacade.java Wed Nov  3 15:06:21 2010
@@ -80,7 +80,7 @@ public interface BrokerFacade {
 			throws Exception;
 
 	/**
-	 * All durable subscribers to topics of the broker.
+	 * Active durable subscribers to topics of the broker.
 	 * 
 	 * @return not <code>null</code>
 	 * @throws Exception
@@ -88,6 +88,16 @@ public interface BrokerFacade {
 	Collection<DurableSubscriptionViewMBean> getDurableTopicSubscribers()
 			throws Exception;
 
+
+	/**
+	 * Inactive durable subscribers to topics of the broker.
+	 *
+	 * @return not <code>null</code>
+	 * @throws Exception
+	 */
+	Collection<DurableSubscriptionViewMBean> getInactiveDurableTopicSubscribers()
+			throws Exception;
+
 	/**
 	 * The names of all transport connectors of the broker (f.e. openwire, ssl)
 	 * 

Modified: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacadeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacadeSupport.java?rev=1030490&r1=1030489&r2=1030490&view=diff
==============================================================================
--- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacadeSupport.java (original)
+++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacadeSupport.java Wed Nov  3 15:06:21 2010
@@ -76,6 +76,15 @@ public abstract class BrokerFacadeSuppor
         return getManagedObjects(queues, DurableSubscriptionViewMBean.class);
     }
 
+    public Collection<DurableSubscriptionViewMBean> getInactiveDurableTopicSubscribers() throws Exception {
+        BrokerViewMBean broker = getBrokerAdmin();
+        if (broker == null) {
+            return Collections.EMPTY_LIST;
+        }
+        ObjectName[] queues = broker.getInactiveDurableTopicSubscribers();
+        return getManagedObjects(queues, DurableSubscriptionViewMBean.class);
+    }
+
     public QueueViewMBean getQueue(String name) throws Exception {
         return (QueueViewMBean) getDestinationByName(getQueues(), name);
     }