You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/03/02 11:31:26 UTC
svn commit: r382344 - in
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker:
jmx/ region/
Author: rajdavies
Date: Thu Mar 2 02:31:23 2006
New Revision: 382344
URL: http://svn.apache.org/viewcvs?rev=382344&view=rev
Log:
Added support for view inactive durable consumers
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java (with props)
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=382344&r1=382343&r2=382344&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java Thu Mar 2 02:31:23 2006
@@ -117,4 +117,8 @@
return broker.getTemporaryQueueSubscribers();
}
+ public ObjectName[] getInactiveDurableTopicSubscribers(){
+ return broker.getInactiveDurableTopicSubscribers();
+ }
+
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java?rev=382344&r1=382343&r2=382344&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java Thu Mar 2 02:31:23 2006
@@ -45,6 +45,7 @@
public ObjectName[] getTopicSubscribers();
public ObjectName[] getDurableTopicSubscribers();
+ public ObjectName[] getInactiveDurableTopicSubscribers();
public ObjectName[] getQueueSubscribers();
public ObjectName[] getTemporaryTopicSubscribers();
public ObjectName[] getTemporaryQueueSubscribers();
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java?rev=382344&r1=382343&r2=382344&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java Thu Mar 2 02:31:23 2006
@@ -23,10 +23,16 @@
public class DurableSubscriptionView extends SubscriptionView implements DurableSubscriptionViewMBean {
protected String subscriptionName;
- public DurableSubscriptionView(Subscription sub){
- super(sub);
+ /**
+ * Constructor
+ * @param clientId
+ * @param sub
+ */
+ public DurableSubscriptionView(String clientId,Subscription sub){
+ super(clientId,sub);
this.subscriptionName = sub.getConsumerInfo().getSubcriptionName();
}
+
/**
* @return name of the durable consumer
*/
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java?rev=382344&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java Thu Mar 2 02:31:23 2006
@@ -0,0 +1,102 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.TabularData;
+import org.apache.activemq.command.SubscriptionInfo;
+/**
+ * @version $Revision: 1.5 $
+ */
+public class InactiveDurableSubscriptionView extends SubscriptionView implements DurableSubscriptionViewMBean {
+
+ protected SubscriptionInfo info;
+ public InactiveDurableSubscriptionView(String clientId,SubscriptionInfo sub){
+ super(clientId,null);
+ this.info = sub;
+ }
+
+
+
+
+ /**
+ * @return the id of the Subscription
+ */
+ public long getSubcriptionId(){
+ return -1;
+ }
+
+ /**
+ * @return the destination name
+ */
+ public String getDestinationName(){
+ return info.getDestination().getPhysicalName();
+
+ }
+
+ /**
+ * @return true if the destination is a Queue
+ */
+ public boolean isDestinationQueue(){
+ return false;
+ }
+
+ /**
+ * @return true of the destination is a Topic
+ */
+ public boolean isDestinationTopic(){
+ return true;
+ }
+
+ /**
+ * @return true if the destination is temporary
+ */
+ public boolean isDestinationTemporary(){
+ return false;
+ }
+ /**
+ * @return name of the durable consumer
+ */
+ public String getSubscriptionName(){
+ return info.getSubcriptionName();
+ }
+
+ /**
+ * @return true if the subscriber is active
+ */
+ public boolean isActive(){
+ return false;
+ }
+
+ /**
+ * Browse messages for this durable subscriber
+ *
+ * @return messages
+ * @throws OpenDataException
+ */
+ public CompositeData[] browse() throws OpenDataException{
+ return null;
+ }
+
+ /**
+ * Browse messages for this durable subscriber
+ *
+ * @return messages
+ * @throws OpenDataException
+ */
+ public TabularData browseAsTable() throws OpenDataException{
+ return null;
+ }
+}
\ No newline at end of file
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java
------------------------------------------------------------------------------
svn:executable = *
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java?rev=382344&r1=382343&r2=382344&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java Thu Mar 2 02:31:23 2006
@@ -41,7 +41,7 @@
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
Subscription sub = super.createSubscription(context, info);
- regionBroker.registerSubscription(sub);
+ regionBroker.registerSubscription(context,sub);
return sub;
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=382344&r1=382343&r2=382344&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Thu Mar 2 02:31:23 2006
@@ -14,13 +14,19 @@
package org.apache.activemq.broker.jmx;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Hashtable;
+import java.util.Iterator;
import java.util.Map;
import java.util.Set;
+import java.util.Map.Entry;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.Region;
import org.apache.activemq.broker.region.RegionBroker;
@@ -28,11 +34,15 @@
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.util.JMXSupport;
+import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
@@ -47,8 +57,11 @@
private final Map queueSubscribers=new ConcurrentHashMap();
private final Map topicSubscribers=new ConcurrentHashMap();
private final Map durableTopicSubscribers=new ConcurrentHashMap();
+ private final Map inactiveDurableTopicSubscribers=new ConcurrentHashMap();
private final Map temporaryQueueSubscribers=new ConcurrentHashMap();
private final Map temporaryTopicSubscribers=new ConcurrentHashMap();
+ private final Map subscriptionKeys = new ConcurrentHashMap();
+ private final Map subscriptionMap = new ConcurrentHashMap();
public ManagedRegionBroker(BrokerService brokerService,MBeanServer mbeanServer,ObjectName brokerObjectName,
TaskRunnerFactory taskRunnerFactory,UsageManager memoryManager,PersistenceAdapter adapter,
@@ -57,6 +70,13 @@
this.mbeanServer=mbeanServer;
this.brokerObjectName=brokerObjectName;
}
+
+ public void start() throws Exception {
+ super.start();
+ //build all existing durable subscriptions
+ buildExistingSubscriptions();
+
+ }
protected Region createQueueRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory,
PersistenceAdapter adapter,PolicyMap policyMap){
@@ -108,33 +128,37 @@
}
}
- public void registerSubscription(Subscription sub){
+ public void registerSubscription(ConnectionContext context,Subscription sub){
+ // NEED CONTEXT TO GET CLIENT ID AND USE Subscription KEY!!!
+ SubscriptionKey key = new SubscriptionKey(context.getClientId(),sub.getConsumerInfo().getSubcriptionName());
Hashtable map=new Hashtable(brokerObjectName.getKeyPropertyList());
map.put("Type",JMXSupport.encodeObjectNamePart("Subscription"));
- map.put("name",JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().toString()));
+ String name = key.toString() + ":" + sub.getConsumerInfo().toString();
+ map.put("name",JMXSupport.encodeObjectNamePart(name));
+ map.put("active", "true");
try{
ObjectName objectName=new ObjectName(brokerObjectName.getDomain(),map);
SubscriptionView view;
if(sub.getConsumerInfo().isDurable()){
- view=new DurableSubscriptionView(sub);
+ view=new DurableSubscriptionView(context.getClientId(),sub);
}else{
- view=new SubscriptionView(sub);
+ view=new SubscriptionView(context.getClientId(),sub);
}
- registerSubscription(objectName,sub.getConsumerInfo(),view);
+ subscriptionMap.put(sub,objectName);
+ registerSubscription(objectName,sub.getConsumerInfo(),key,view);
}catch(Exception e){
log.error("Failed to register subscription "+sub,e);
}
}
public void unregisterSubscription(Subscription sub){
- Hashtable map=new Hashtable(brokerObjectName.getKeyPropertyList());
- map.put("Type",JMXSupport.encodeObjectNamePart("Subscription"));
- map.put("name",JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().toString()));
- try{
- ObjectName objectName=new ObjectName(brokerObjectName.getDomain(),map);
- unregisterSubscription(objectName);
- }catch(Exception e){
- log.error("Failed to unregister subscription "+sub,e);
+ ObjectName name=(ObjectName) subscriptionMap.get(sub);
+ if(name!=null){
+ try{
+ unregisterSubscription(name);
+ }catch(Exception e){
+ log.error("Failed to unregister subscription "+sub,e);
+ }
}
}
@@ -163,7 +187,7 @@
mbeanServer.unregisterMBean(key);
}
- protected void registerSubscription(ObjectName key,ConsumerInfo info,SubscriptionView view) throws Exception{
+ protected void registerSubscription(ObjectName key,ConsumerInfo info,SubscriptionKey subscriptionKey,SubscriptionView view) throws Exception{
ActiveMQDestination dest=info.getDestination();
if(dest.isQueue()){
if(dest.isTemporary()){
@@ -177,6 +201,16 @@
}else{
if(info.isDurable()){
durableTopicSubscribers.put(key,view);
+ //unregister any inactive durable subs
+ try {
+ ObjectName inactiveName = (ObjectName) subscriptionKeys.get(subscriptionKey);
+ if (inactiveName != null){
+ inactiveDurableTopicSubscribers.remove(inactiveName);
+ mbeanServer.unregisterMBean(inactiveName);
+ }
+ }catch(Exception e){
+ log.error("Unable to unregister inactive durable subscriber: " + subscriptionKey,e);
+ }
}else{
topicSubscribers.put(key,view);
}
@@ -188,10 +222,67 @@
protected void unregisterSubscription(ObjectName key) throws Exception{
queueSubscribers.remove(key);
topicSubscribers.remove(key);
- durableTopicSubscribers.remove(key);
+ inactiveDurableTopicSubscribers.remove(key);
temporaryQueueSubscribers.remove(key);
temporaryTopicSubscribers.remove(key);
mbeanServer.unregisterMBean(key);
+ DurableSubscriptionView view = (DurableSubscriptionView) durableTopicSubscribers.remove(key);
+ if (view != null){
+ //need to put this back in the inactive list
+ SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(),view.getSubscriptionName());
+ SubscriptionInfo info = new SubscriptionInfo();
+ info.setClientId(subscriptionKey.getClientId());
+ info.setSubcriptionName(subscriptionKey.getSubscriptionName());
+ info.setDestination(new ActiveMQTopic(view.getDestinationName()));
+ addInactiveSubscription(subscriptionKey, info);
+ }
+
+
+ }
+
+ protected void buildExistingSubscriptions() throws Exception{
+ Map subscriptions = new HashMap();
+ Set destinations = adaptor.getDestinations();
+ if (destinations != null){
+ for (Iterator iter = destinations.iterator(); iter.hasNext();){
+ ActiveMQDestination dest = (ActiveMQDestination) iter.next();
+ if (dest.isTopic()){
+ TopicMessageStore store = adaptor.createTopicMessageStore((ActiveMQTopic) dest);
+ SubscriptionInfo[] infos = store.getAllSubscriptions();
+ if (infos != null){
+ for (int i = 0; i < infos.length; i++) {
+
+ SubscriptionInfo info = infos[i];
+ log.debug("Restoring durable subscription: "+infos);
+ SubscriptionKey key = new SubscriptionKey(info);
+ subscriptions.put(key,info);
+ }
+ }
+ }
+ }
+ }
+ for (Iterator i = subscriptions.entrySet().iterator();i.hasNext();){
+ Map.Entry entry = (Entry) i.next();
+ SubscriptionKey key = (SubscriptionKey) entry.getKey();
+ SubscriptionInfo info = (SubscriptionInfo) entry.getValue();
+ addInactiveSubscription(key, info);
+ }
+ }
+
+ protected void addInactiveSubscription(SubscriptionKey key,SubscriptionInfo info){
+ Hashtable map=new Hashtable(brokerObjectName.getKeyPropertyList());
+ map.put("Type",JMXSupport.encodeObjectNamePart("Subscription"));
+ map.put("name",JMXSupport.encodeObjectNamePart(key.toString()));
+ map.put("active", "false");
+ try{
+ ObjectName objectName=new ObjectName(brokerObjectName.getDomain(),map);
+ SubscriptionView view = new InactiveDurableSubscriptionView(key.getClientId(),info);
+ mbeanServer.registerMBean(view,objectName);
+ inactiveDurableTopicSubscribers.put(objectName,view);
+ subscriptionKeys.put(key, objectName);
+ }catch(Exception e){
+ log.error("Failed to register subscription "+info,e);
+ }
}
protected ObjectName[] getTopics(){
@@ -229,6 +320,11 @@
}
protected ObjectName[] getTemporaryQueueSubscribers(){
Set set = temporaryQueueSubscribers.keySet();
+ return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+ }
+
+ protected ObjectName[] getInactiveDurableTopicSubscribers(){
+ Set set = inactiveDurableTopicSubscribers.keySet();
return (ObjectName[])set.toArray(new ObjectName[set.size()]);
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java?rev=382344&r1=382343&r2=382344&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java Thu Mar 2 02:31:23 2006
@@ -39,7 +39,7 @@
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
Subscription sub = super.createSubscription(context, info);
- regionBroker.registerSubscription(sub);
+ regionBroker.registerSubscription(context,sub);
return sub;
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java?rev=382344&r1=382343&r2=382344&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java Thu Mar 2 02:31:23 2006
@@ -39,7 +39,7 @@
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
Subscription sub = super.createSubscription(context, info);
- regionBroker.registerSubscription(sub);
+ regionBroker.registerSubscription(context,sub);
return sub;
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java?rev=382344&r1=382343&r2=382344&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java Thu Mar 2 02:31:23 2006
@@ -41,7 +41,7 @@
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
Subscription sub = super.createSubscription(context, info);
- regionBroker.registerSubscription(sub);
+ regionBroker.registerSubscription(context,sub);
return sub;
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java?rev=382344&r1=382343&r2=382344&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java Thu Mar 2 02:31:23 2006
@@ -29,22 +29,30 @@
protected final Subscription subscription;
-
+ protected final String clientId;
/**
* Constructior
* @param subs
*/
- public SubscriptionView(Subscription subs){
+ public SubscriptionView(String clientId,Subscription subs){
+ this.clientId = clientId;
this.subscription = subs;
}
/**
+ * @return the clientId
+ */
+ public String getClientId(){
+ return clientId;
+ }
+
+ /**
* @return the id of the Connection the Subscription is on
*/
public String getConnectionId(){
- ConsumerInfo info = subscription.getConsumerInfo();
+ ConsumerInfo info = getConsumerInfo();
if (info != null){
return info.getConsumerId().getConnectionId();
}
@@ -55,7 +63,7 @@
* @return the id of the Session the subscription is on
*/
public long getSessionId(){
- ConsumerInfo info = subscription.getConsumerInfo();
+ ConsumerInfo info = getConsumerInfo();
if (info != null){
return info.getConsumerId().getSessionId();
}
@@ -66,7 +74,7 @@
* @return the id of the Subscription
*/
public long getSubcriptionId(){
- ConsumerInfo info = subscription.getConsumerInfo();
+ ConsumerInfo info = getConsumerInfo();
if (info != null){
return info.getConsumerId().getValue();
}
@@ -77,7 +85,7 @@
* @return the destination name
*/
public String getDestinationName(){
- ConsumerInfo info = subscription.getConsumerInfo();
+ ConsumerInfo info = getConsumerInfo();
if (info != null){
ActiveMQDestination dest = info.getDestination();
return dest.getPhysicalName();
@@ -90,7 +98,7 @@
* @return true if the destination is a Queue
*/
public boolean isDestinationQueue(){
- ConsumerInfo info = subscription.getConsumerInfo();
+ ConsumerInfo info = getConsumerInfo();
if (info != null){
ActiveMQDestination dest = info.getDestination();
return dest.isQueue();
@@ -102,7 +110,7 @@
* @return true of the destination is a Topic
*/
public boolean isDestinationTopic(){
- ConsumerInfo info = subscription.getConsumerInfo();
+ ConsumerInfo info = getConsumerInfo();
if (info != null){
ActiveMQDestination dest = info.getDestination();
return dest.isTopic();
@@ -114,41 +122,54 @@
* @return true if the destination is temporary
*/
public boolean isDestinationTemporary(){
- ConsumerInfo info = subscription.getConsumerInfo();
+ ConsumerInfo info = getConsumerInfo();
if (info != null){
ActiveMQDestination dest = info.getDestination();
return dest.isTemporary();
}
return false;
}
+
+ /**
+ * @return true if the subscriber is active
+ */
+ public boolean isActive(){
+ return true;
+ }
/**
* The subscription should release as may references as it can to help the garbage collector
* reclaim memory.
*/
public void gc(){
+ if (subscription != null){
subscription.gc();
+ }
}
/**
* @return number of messages pending delivery
*/
public int getPending(){
- return subscription.pending();
+ return subscription != null ? subscription.pending() : 0;
}
/**
* @return number of messages dispatched
*/
public int getDispatched(){
- return subscription.dispatched();
+ return subscription != null ? subscription.dispatched() : 0;
}
/**
* @return number of messages delivered
*/
public int getDelivered(){
- return subscription.delivered();
+ return subscription != null ? subscription.delivered() : 0;
+ }
+
+ protected ConsumerInfo getConsumerInfo(){
+ return subscription != null ? subscription.getConsumerInfo() : null;
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java?rev=382344&r1=382343&r2=382344&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java Thu Mar 2 02:31:23 2006
@@ -17,6 +17,11 @@
* @version $Revision: 1.5 $
*/
public interface SubscriptionViewMBean{
+
+ /**
+ * @return the clientId
+ */
+ public String getClientId();
/**
* @return the id of the Connection the Subscription is on
*/
@@ -51,6 +56,11 @@
* @return true if the destination is temporary
*/
public boolean isDestinationTemporary();
+
+ /**
+ * @return true if the subscriber is active
+ */
+ public boolean isActive();
/**
* The subscription should release as may references as it can to help the garbage collector reclaim memory.
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=382344&r1=382343&r2=382344&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Thu Mar 2 02:31:23 2006
@@ -81,7 +81,7 @@
private BrokerId brokerId;
private String brokerName;
private Map clientIdSet = new HashMap(); // we will synchronize access
- private PersistenceAdapter adaptor;
+ protected PersistenceAdapter adaptor;
public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, PersistenceAdapter adapter) throws IOException {
this(brokerService,taskRunnerFactory, memoryManager, createDefaultPersistenceAdapter(memoryManager), null);