You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2010/11/03 16:06:21 UTC
svn commit: r1030490 - in /activemq/trunk:
activemq-core/src/main/java/org/apache/activemq/broker/jmx/
activemq-core/src/main/java/org/apache/activemq/broker/region/
activemq-core/src/test/java/org/apache/activemq/broker/jmx/
activemq-core/src/test/jav...
Author: dejanb
Date: Wed Nov 3 15:06:21 2010
New Revision: 1030490
URL: http://svn.apache.org/viewvc?rev=1030490&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-3013 - durable subs and jmx
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableUnsubscribeTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ManagedDurableSubscriptionTest.java
activemq/trunk/activemq-web-console/src/main/webapp/subscribers.jsp
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacade.java
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacadeSupport.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=1030490&r1=1030489&r2=1030490&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Wed Nov 3 15:06:21 2010
@@ -179,21 +179,29 @@ public class ManagedRegionBroker extends
String connectionClientId = context.getClientId();
ObjectName brokerJmxObjectName = brokerObjectName;
String objectNameStr = getSubscriptionObjectName(sub, connectionClientId, brokerJmxObjectName);
-
SubscriptionKey key = new SubscriptionKey(context.getClientId(), sub.getConsumerInfo().getSubscriptionName());
try {
ObjectName objectName = new ObjectName(objectNameStr);
SubscriptionView view;
- if (sub.getConsumerInfo().isDurable()) {
- view = new DurableSubscriptionView(this, context.getClientId(), sub);
+ if (sub.getConsumerInfo().getConsumerId().getConnectionId().equals("OFFLINE")) {
+ // add offline subscribers to inactive list
+ SubscriptionInfo info = new SubscriptionInfo();
+ info.setClientId(context.getClientId());
+ info.setSubscriptionName(sub.getConsumerInfo().getSubscriptionName());
+ info.setDestination(sub.getConsumerInfo().getDestination());
+ addInactiveSubscription(key, info);
} else {
- if (sub instanceof TopicSubscription) {
- view = new TopicSubscriptionView(context.getClientId(), (TopicSubscription)sub);
+ if (sub.getConsumerInfo().isDurable()) {
+ view = new DurableSubscriptionView(this, context.getClientId(), sub);
} else {
- view = new SubscriptionView(context.getClientId(), sub);
+ if (sub instanceof TopicSubscription) {
+ view = new TopicSubscriptionView(context.getClientId(), (TopicSubscription) sub);
+ } else {
+ view = new SubscriptionView(context.getClientId(), sub);
+ }
}
+ registerSubscription(objectName, sub.getConsumerInfo(), key, view);
}
- registerSubscription(objectName, sub.getConsumerInfo(), key, view);
subscriptionMap.put(sub, objectName);
return objectName;
} catch (Exception e) {
@@ -227,11 +235,38 @@ public class ManagedRegionBroker extends
return objectNameStr;
}
+ @Override
+ public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
+ Subscription sub = super.addConsumer(context, info);
+ SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
+ ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
+ if (inactiveName != null) {
+ // if it was inactive, register it
+ registerSubscription(context, sub);
+ }
+ return sub;
+ }
+
+ @Override
+ public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
+ for (Subscription sub : subscriptionMap.keySet()) {
+ if (sub.getConsumerInfo().equals(info)) {
+ // unregister all consumer subs
+ unregisterSubscription(subscriptionMap.get(sub), true);
+ }
+ }
+ super.removeConsumer(context, info);
+ }
+
public void unregisterSubscription(Subscription sub) {
ObjectName name = subscriptionMap.remove(sub);
if (name != null) {
try {
- unregisterSubscription(name);
+ SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
+ ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
+ if (inactiveName != null) {
+ inactiveDurableTopicSubscribers.remove(inactiveName);
+ }
} catch (Exception e) {
LOG.error("Failed to unregister subscription " + sub, e);
}
@@ -337,10 +372,9 @@ public class ManagedRegionBroker extends
}
- protected void unregisterSubscription(ObjectName key) throws Exception {
+ protected void unregisterSubscription(ObjectName key, boolean addToInactive) throws Exception {
queueSubscribers.remove(key);
topicSubscribers.remove(key);
- inactiveDurableTopicSubscribers.remove(key);
temporaryQueueSubscribers.remove(key);
temporaryTopicSubscribers.remove(key);
if (registeredMBeans.remove(key)) {
@@ -355,11 +389,13 @@ public class ManagedRegionBroker extends
if (view != null) {
// need to put this back in the inactive list
SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(), view.getSubscriptionName());
- SubscriptionInfo info = new SubscriptionInfo();
- info.setClientId(subscriptionKey.getClientId());
- info.setSubscriptionName(subscriptionKey.getSubscriptionName());
- info.setDestination(new ActiveMQTopic(view.getDestinationName()));
- addInactiveSubscription(subscriptionKey, info);
+ if (addToInactive) {
+ SubscriptionInfo info = new SubscriptionInfo();
+ info.setClientId(subscriptionKey.getClientId());
+ info.setSubscriptionName(subscriptionKey.getSubscriptionName());
+ info.setDestination(new ActiveMQTopic(view.getDestinationName()));
+ addInactiveSubscription(subscriptionKey, info);
+ }
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=1030490&r1=1030489&r2=1030490&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Wed Nov 3 15:06:21 2010
@@ -138,7 +138,6 @@ public class TopicRegion extends Abstrac
throw new JMSException("Durable consumer is in use");
}
- durableSubscriptions.remove(key);
synchronized (destinationsMutex) {
for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
Destination dest = iter.next();
@@ -149,7 +148,12 @@ public class TopicRegion extends Abstrac
}
}
}
- super.removeConsumer(context, sub.getConsumerInfo());
+ if (subscriptions.get(sub.getConsumerInfo()) != null) {
+ super.removeConsumer(context, sub.getConsumerInfo());
+ } else {
+ // try destroying inactive subscriptions
+ destroySubscription(sub);
+ }
}
@Override
@@ -159,7 +163,6 @@ public class TopicRegion extends Abstrac
@Override
protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception {
-
List<Subscription> rc = super.addSubscriptionsForDestination(context, dest);
Set<Subscription> dupChecker = new HashSet<Subscription>(rc);
@@ -210,7 +213,6 @@ public class TopicRegion extends Abstrac
}
}
}
-
return rc;
}
@@ -250,6 +252,7 @@ public class TopicRegion extends Abstrac
if (sub == null) {
sub = new DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive);
+
if (destination != null && broker.getDestinationPolicy() != null) {
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
if (entry != null) {
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java?rev=1030490&r1=1030489&r2=1030490&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java Wed Nov 3 15:06:21 2010
@@ -442,7 +442,7 @@ public class MBeanTest extends EmbeddedB
String selector = null;
ObjectName name1 = broker.createDurableSubscriber(clientID, "subscriber1", topicName, selector);
broker.createDurableSubscriber(clientID, "subscriber2", topicName, selector);
- assertEquals("Durable subscriber count", 2, broker.getDurableTopicSubscribers().length);
+ assertEquals("Durable subscriber count", 2, broker.getInactiveDurableTopicSubscribers().length);
assertNotNull("Should have created an mbean name for the durable subscriber!", name1);
@@ -450,7 +450,7 @@ public class MBeanTest extends EmbeddedB
// now lets try destroy it
broker.destroyDurableSubscriber(clientID, "subscriber1");
- assertEquals("Durable subscriber count", 1, broker.getDurableTopicSubscribers().length);
+ assertEquals("Durable subscriber count", 1, broker.getInactiveDurableTopicSubscribers().length);
}
protected void assertConsumerCounts() throws Exception {
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java?rev=1030490&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java Wed Nov 3 15:06:21 2010
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.io.File;
+import java.lang.management.ManagementFactory;
+
+public class DurableSubscriptionUnsubscribeTest extends TestSupport {
+
+ BrokerService broker = null;
+ Connection connection = null;
+ ActiveMQTopic topic;
+
+ public void testJMXSubscriptionUnsubscribe() throws Exception {
+ doJMXUnsubscribe(false);
+ }
+
+ public void testJMXSubscriptionUnsubscribeWithRestart() throws Exception {
+ doJMXUnsubscribe(true);
+ }
+
+ public void testConnectionSubscriptionUnsubscribe() throws Exception {
+ doConnectionUnsubscribe(false);
+ }
+
+ public void testConnectionSubscriptionUnsubscribeWithRestart() throws Exception {
+ doConnectionUnsubscribe(true);
+ }
+
+ public void testDirectSubscriptionUnsubscribe() throws Exception {
+ doDirectUnsubscribe(false);
+ }
+
+ public void testDirectubscriptionUnsubscribeWithRestart() throws Exception {
+ doDirectUnsubscribe(true);
+ }
+
+ public void doJMXUnsubscribe(boolean restart) throws Exception {
+ for (int i = 0; i < 100; i++) {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId" + i);
+ session.close();
+ }
+
+ Thread.sleep(2 * 1000);
+
+ if (restart) {
+ stopBroker();
+ startBroker(false);
+ }
+
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ ObjectName[] subscriptions = broker.getAdminView().getDurableTopicSubscribers();
+ ObjectName[] inactive = broker.getAdminView().getInactiveDurableTopicSubscribers();
+
+ for (ObjectName subscription: subscriptions) {
+ mbs.invoke(subscription, "destroy", null, null);
+ }
+ for (ObjectName subscription: inactive) {
+ mbs.invoke(subscription, "destroy", null, null);
+ }
+
+ Thread.sleep(2 * 1000);
+
+ subscriptions = broker.getAdminView().getDurableTopicSubscribers();
+ assertEquals(0, subscriptions.length);
+
+ subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
+ assertEquals(0, subscriptions.length);
+ }
+
+ public void testInactiveSubscriptions() throws Exception {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId");
+
+ ObjectName[] subscriptions = broker.getAdminView().getDurableTopicSubscribers();
+ assertEquals(1, subscriptions.length);
+
+ subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
+ assertEquals(0, subscriptions.length);
+
+ session.close();
+
+ Thread.sleep(1000);
+
+ subscriptions = broker.getAdminView().getDurableTopicSubscribers();
+ assertEquals(0, subscriptions.length);
+
+ subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
+ assertEquals(1, subscriptions.length);
+
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId");
+
+ Thread.sleep(1000);
+
+ subscriptions = broker.getAdminView().getDurableTopicSubscribers();
+ assertEquals(1, subscriptions.length);
+
+ subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
+ assertEquals(0, subscriptions.length);
+
+ session.close();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Thread.sleep(1000);
+
+ subscriptions = broker.getAdminView().getDurableTopicSubscribers();
+ assertEquals(0, subscriptions.length);
+
+ subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
+ assertEquals(1, subscriptions.length);
+
+ session.unsubscribe("SubsId");
+
+ Thread.sleep(1000);
+
+ subscriptions = broker.getAdminView().getDurableTopicSubscribers();
+ assertEquals(0, subscriptions.length);
+
+ subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
+ assertEquals(0, subscriptions.length);
+
+ session.close();
+
+ }
+
+ public void doConnectionUnsubscribe(boolean restart) throws Exception {
+ for (int i = 0; i < 100; i++) {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId" + i);
+ session.close();
+ }
+
+ Thread.sleep(2 * 1000);
+
+ if (restart) {
+ stopBroker();
+ startBroker(false);
+ }
+
+ ObjectName[] subscriptions = broker.getAdminView().getDurableTopicSubscribers();
+ assertEquals(0, subscriptions.length);
+
+ subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
+ assertEquals(100, subscriptions.length);
+
+ for (int i = 0; i < 100; i++) {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.unsubscribe("SubsId" + i);
+ session.close();
+ }
+
+ Thread.sleep(2 * 1000);
+
+ subscriptions = broker.getAdminView().getDurableTopicSubscribers();
+ assertEquals(0, subscriptions.length);
+
+ subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
+ assertEquals(0, subscriptions.length);
+ }
+
+ public void doDirectUnsubscribe(boolean restart) throws Exception {
+ for (int i = 0; i < 100; i++) {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId" + i);
+ session.close();
+ }
+
+ Thread.sleep(2 * 1000);
+
+ if (restart) {
+ stopBroker();
+ startBroker(false);
+ }
+
+ for (int i = 0; i < 100; i++) {
+ RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
+ info.setClientId(getName());
+ info.setSubscriptionName("SubsId" + i);
+ ConnectionContext context = new ConnectionContext();
+ context.setBroker(broker.getRegionBroker());
+ context.setClientId(getName());
+ broker.getRegionBroker().removeSubscription(context, info);
+ }
+
+ Thread.sleep(2 * 1000);
+
+ ObjectName[] subscriptions = broker.getAdminView().getDurableTopicSubscribers();
+ assertEquals(0, subscriptions.length);
+
+ subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
+ assertEquals(0, subscriptions.length);
+ }
+
+ private void startBroker(boolean deleteMessages) throws Exception {
+ broker = BrokerFactory.createBroker("broker:(vm://localhost)");
+ broker.setUseJmx(true);
+ broker.setBrokerName(getName());
+
+ broker.setPersistent(true);
+ KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
+ persistenceAdapter.setDirectory(new File("activemq-data/" + getName()));
+ broker.setPersistenceAdapter(persistenceAdapter);
+ if (deleteMessages) {
+ broker.setDeleteAllMessagesOnStartup(true);
+ }
+
+ broker.start();
+
+ connection = createConnection();
+ }
+
+ private void stopBroker() throws Exception {
+ if (connection != null)
+ connection.close();
+ connection = null;
+
+ if (broker != null)
+ broker.stop();
+ broker = null;
+ }
+
+ protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+ return new ActiveMQConnectionFactory("vm://" + getName() + "?waitForStart=5000&create=false");
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+
+ topic = (ActiveMQTopic) createDestination();
+ startBroker(true);
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ stopBroker();
+ super.tearDown();
+ }
+
+ @Override
+ protected Connection createConnection() throws Exception {
+ Connection rc = super.createConnection();
+ rc.setClientID(getName());
+ rc.start();
+ return rc;
+ }
+}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableUnsubscribeTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableUnsubscribeTest.java?rev=1030490&r1=1030489&r2=1030490&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableUnsubscribeTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableUnsubscribeTest.java Wed Nov 3 15:06:21 2010
@@ -63,7 +63,7 @@ public class DurableUnsubscribeTest exte
assertEquals("Subscription is missing.", 1, d.getConsumers().size());
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- ObjectName[] subNames = broker.getAdminView().getDurableTopicSubscribers();
+ ObjectName[] subNames = broker.getAdminView().getInactiveDurableTopicSubscribers();
mbs.invoke(subNames[0], "destroy", new Object[0], new String[0]);
assertEquals("Subscription exists.", 0, d.getConsumers().size());
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ManagedDurableSubscriptionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ManagedDurableSubscriptionTest.java?rev=1030490&r1=1030489&r2=1030490&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ManagedDurableSubscriptionTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ManagedDurableSubscriptionTest.java Wed Nov 3 15:06:21 2010
@@ -47,24 +47,26 @@ public class ManagedDurableSubscriptionT
startBroker();
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- ObjectName subscriptionObjectName = broker.getAdminView().getDurableTopicSubscribers()[0];
+ ObjectName inactiveSubscriptionObjectName = broker.getAdminView().getInactiveDurableTopicSubscribers()[0];
- Object active = mbs.getAttribute(subscriptionObjectName, "Active");
- assertTrue("Subscription is active.", Boolean.FALSE.equals(active));
+ Object inactive = mbs.getAttribute(inactiveSubscriptionObjectName, "Active");
+ assertTrue("Subscription is active.", Boolean.FALSE.equals(inactive));
// activate
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId");
- active = mbs.getAttribute(subscriptionObjectName, "Active");
+ ObjectName activeSubscriptionObjectName = broker.getAdminView().getDurableTopicSubscribers()[0];
+
+ Object active = mbs.getAttribute(activeSubscriptionObjectName, "Active");
assertTrue("Subscription is INactive.", Boolean.TRUE.equals(active));
// deactivate
connection.close();
connection = null;
- active = mbs.getAttribute(subscriptionObjectName, "Active");
- assertTrue("Subscription is active.", Boolean.FALSE.equals(active));
+ inactive = mbs.getAttribute(inactiveSubscriptionObjectName, "Active");
+ assertTrue("Subscription is active.", Boolean.FALSE.equals(inactive));
}
Modified: activemq/trunk/activemq-web-console/src/main/webapp/subscribers.jsp
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web-console/src/main/webapp/subscribers.jsp?rev=1030490&r1=1030489&r2=1030490&view=diff
==============================================================================
--- activemq/trunk/activemq-web-console/src/main/webapp/subscribers.jsp (original)
+++ activemq/trunk/activemq-web-console/src/main/webapp/subscribers.jsp Wed Nov 3 15:06:21 2010
@@ -70,7 +70,7 @@
</form>
-<h2>Durable Topic Subscribers</h2>
+<h2>Active Durable Topic Subscribers</h2>
<table id="topics" class="sortable autostripe">
@@ -107,6 +107,48 @@
</td>
</tr>
</c:forEach>
+
+</tbody>
+</table>
+
+<h2>Offline Durable Topic Subscribers</h2>
+
+
+<table id="topics" class="sortable autostripe">
+<thead>
+<tr>
+<th>Client ID</th>
+<th>Subscription Name</th>
+<th>Connection ID</th>
+<th>Destination</th>
+<th>Selector</th>
+<th>Pending Queue Size</th>
+<th>Dispatched Queue Size</th>
+<th>Dispatched Counter</th>
+<th>Enqueue Counter</th>
+<th>Dequeue Counter</th>
+<th>Operations</th>
+</tr>
+</thead>
+<tbody>
+<c:forEach items="${requestContext.brokerQuery.inactiveDurableTopicSubscribers}" var="row">
+<tr>
+<td><form:tooltip text="${row.clientId}" length="10"/></td>
+<td><form:tooltip text="${row.subscriptionName}" length="10"/></td>
+<td><form:tooltip text="${row.connectionId}" length="10"/></td>
+<td><form:tooltip text="${row.destinationName}" length="10"/></td>
+<td>${row.selector}</td>
+<td>${row.pendingQueueSize}</td>
+<td>${row.dispatchedQueueSize}</td>
+<td>${row.dispachedCounter}</td>
+<td>${row.enqueueCounter}</td>
+<td>${row.dequeueCounter}</td>
+<td>
+ <a href="deleteSubscriber.action?clientId=${row.clientId}&subscriberName=${row.subscriptionName}&secret=<c:out value='${sessionScope["secret"]}'/>">Delete</a>
+</td>
+</tr>
+</c:forEach>
+
</tbody>
</table>
Modified: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacade.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacade.java?rev=1030490&r1=1030489&r2=1030490&view=diff
==============================================================================
--- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacade.java (original)
+++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacade.java Wed Nov 3 15:06:21 2010
@@ -80,7 +80,7 @@ public interface BrokerFacade {
throws Exception;
/**
- * All durable subscribers to topics of the broker.
+ * Active durable subscribers to topics of the broker.
*
* @return not <code>null</code>
* @throws Exception
@@ -88,6 +88,16 @@ public interface BrokerFacade {
Collection<DurableSubscriptionViewMBean> getDurableTopicSubscribers()
throws Exception;
+
+ /**
+ * Inactive durable subscribers to topics of the broker.
+ *
+ * @return not <code>null</code>
+ * @throws Exception
+ */
+ Collection<DurableSubscriptionViewMBean> getInactiveDurableTopicSubscribers()
+ throws Exception;
+
/**
* The names of all transport connectors of the broker (f.e. openwire, ssl)
*
Modified: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacadeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacadeSupport.java?rev=1030490&r1=1030489&r2=1030490&view=diff
==============================================================================
--- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacadeSupport.java (original)
+++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacadeSupport.java Wed Nov 3 15:06:21 2010
@@ -76,6 +76,15 @@ public abstract class BrokerFacadeSuppor
return getManagedObjects(queues, DurableSubscriptionViewMBean.class);
}
+ public Collection<DurableSubscriptionViewMBean> getInactiveDurableTopicSubscribers() throws Exception {
+ BrokerViewMBean broker = getBrokerAdmin();
+ if (broker == null) {
+ return Collections.EMPTY_LIST;
+ }
+ ObjectName[] queues = broker.getInactiveDurableTopicSubscribers();
+ return getManagedObjects(queues, DurableSubscriptionViewMBean.class);
+ }
+
public QueueViewMBean getQueue(String name) throws Exception {
return (QueueViewMBean) getDestinationByName(getQueues(), name);
}