You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/03/02 11:31:26 UTC

svn commit: r382344 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker: jmx/ region/

Author: rajdavies
Date: Thu Mar  2 02:31:23 2006
New Revision: 382344

URL: http://svn.apache.org/viewcvs?rev=382344&view=rev
Log:
Added support for view inactive durable consumers

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java   (with props)
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=382344&r1=382343&r2=382344&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java Thu Mar  2 02:31:23 2006
@@ -117,4 +117,8 @@
         return broker.getTemporaryQueueSubscribers();
     }
     
+    public ObjectName[] getInactiveDurableTopicSubscribers(){
+        return broker.getInactiveDurableTopicSubscribers();
+    }
+    
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java?rev=382344&r1=382343&r2=382344&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java Thu Mar  2 02:31:23 2006
@@ -45,6 +45,7 @@
     
     public ObjectName[] getTopicSubscribers();
     public ObjectName[] getDurableTopicSubscribers();
+    public ObjectName[] getInactiveDurableTopicSubscribers();
     public ObjectName[] getQueueSubscribers();
     public ObjectName[] getTemporaryTopicSubscribers();
     public ObjectName[] getTemporaryQueueSubscribers();

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java?rev=382344&r1=382343&r2=382344&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java Thu Mar  2 02:31:23 2006
@@ -23,10 +23,16 @@
 public class DurableSubscriptionView extends SubscriptionView implements  DurableSubscriptionViewMBean {
     
     protected String subscriptionName;
-    public DurableSubscriptionView(Subscription sub){
-        super(sub);
+    /**
+     * Constructor
+     * @param clientId
+     * @param sub
+     */
+    public DurableSubscriptionView(String clientId,Subscription sub){
+        super(clientId,sub);
         this.subscriptionName = sub.getConsumerInfo().getSubcriptionName();
     }
+    
     /**
      * @return name of the durable consumer
      */

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java?rev=382344&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java Thu Mar  2 02:31:23 2006
@@ -0,0 +1,102 @@
+/**
+ * 
+ * Copyright 2005-2006 The Apache Software Foundation
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.TabularData;
+import org.apache.activemq.command.SubscriptionInfo;
+/**
+ * @version $Revision: 1.5 $
+ */
+public class InactiveDurableSubscriptionView extends SubscriptionView implements  DurableSubscriptionViewMBean {
+    
+    protected SubscriptionInfo info;
+    public InactiveDurableSubscriptionView(String clientId,SubscriptionInfo sub){
+        super(clientId,null);
+        this.info = sub;
+    }
+    
+    
+
+    
+    /**
+     * @return the id of the Subscription
+     */
+    public long getSubcriptionId(){
+        return -1;
+    }
+
+    /**
+     * @return the destination name
+     */
+    public String getDestinationName(){
+        return info.getDestination().getPhysicalName();
+       
+    }
+
+    /**
+     * @return true if the destination is a Queue
+     */
+    public boolean isDestinationQueue(){
+        return false;
+    }
+
+    /**
+     * @return true of the destination is a Topic
+     */
+    public boolean isDestinationTopic(){
+        return true;
+    }
+
+    /**
+     * @return true if the destination is temporary
+     */
+    public boolean isDestinationTemporary(){
+        return false;
+    }
+    /**
+     * @return name of the durable consumer
+     */
+    public String getSubscriptionName(){
+        return info.getSubcriptionName();
+    }
+    
+    /**
+     * @return true if the subscriber is active
+     */
+    public boolean isActive(){
+        return false;
+    }
+
+    /**
+     * Browse messages for this durable subscriber
+     * 
+     * @return messages
+     * @throws OpenDataException
+     */
+    public CompositeData[] browse() throws OpenDataException{
+        return null;
+    }
+
+    /**
+     * Browse messages for this durable subscriber
+     * 
+     * @return messages
+     * @throws OpenDataException
+     */
+    public TabularData browseAsTable() throws OpenDataException{
+        return null;
+    }
+}
\ No newline at end of file

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java?rev=382344&r1=382343&r2=382344&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java Thu Mar  2 02:31:23 2006
@@ -41,7 +41,7 @@
 
     protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
         Subscription sub = super.createSubscription(context, info);
-        regionBroker.registerSubscription(sub);
+        regionBroker.registerSubscription(context,sub);
         return sub;
     }
     

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=382344&r1=382343&r2=382344&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Thu Mar  2 02:31:23 2006
@@ -14,13 +14,19 @@
 package org.apache.activemq.broker.jmx;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Hashtable;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
+import java.util.Map.Entry;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.broker.region.Region;
 import org.apache.activemq.broker.region.RegionBroker;
@@ -28,11 +34,15 @@
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.util.JMXSupport;
+import org.apache.activemq.util.SubscriptionKey;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
@@ -47,8 +57,11 @@
     private final Map queueSubscribers=new ConcurrentHashMap();
     private final Map topicSubscribers=new ConcurrentHashMap();
     private final Map durableTopicSubscribers=new ConcurrentHashMap();
+    private final Map inactiveDurableTopicSubscribers=new ConcurrentHashMap();
     private final Map temporaryQueueSubscribers=new ConcurrentHashMap();
     private final Map temporaryTopicSubscribers=new ConcurrentHashMap();
+    private final Map subscriptionKeys = new ConcurrentHashMap();
+    private final Map subscriptionMap = new ConcurrentHashMap();
 
     public ManagedRegionBroker(BrokerService brokerService,MBeanServer mbeanServer,ObjectName brokerObjectName,
                     TaskRunnerFactory taskRunnerFactory,UsageManager memoryManager,PersistenceAdapter adapter,
@@ -57,6 +70,13 @@
         this.mbeanServer=mbeanServer;
         this.brokerObjectName=brokerObjectName;
     }
+    
+    public void start() throws Exception {
+        super.start();
+        //build all existing durable subscriptions
+        buildExistingSubscriptions();
+        
+    }
 
     protected Region createQueueRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory,
                     PersistenceAdapter adapter,PolicyMap policyMap){
@@ -108,33 +128,37 @@
         }
     }
 
-    public void registerSubscription(Subscription sub){
+    public void registerSubscription(ConnectionContext context,Subscription sub){
+       // NEED CONTEXT TO GET CLIENT ID AND USE Subscription KEY!!!
+        SubscriptionKey key = new SubscriptionKey(context.getClientId(),sub.getConsumerInfo().getSubcriptionName());
         Hashtable map=new Hashtable(brokerObjectName.getKeyPropertyList());
         map.put("Type",JMXSupport.encodeObjectNamePart("Subscription"));
-        map.put("name",JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().toString()));
+        String name = key.toString() + ":" + sub.getConsumerInfo().toString();
+        map.put("name",JMXSupport.encodeObjectNamePart(name));
+        map.put("active", "true");
         try{
             ObjectName objectName=new ObjectName(brokerObjectName.getDomain(),map);
             SubscriptionView view;
             if(sub.getConsumerInfo().isDurable()){
-                view=new DurableSubscriptionView(sub);
+                view=new DurableSubscriptionView(context.getClientId(),sub);
             }else{
-                view=new SubscriptionView(sub);
+                view=new SubscriptionView(context.getClientId(),sub);
             }
-            registerSubscription(objectName,sub.getConsumerInfo(),view);
+            subscriptionMap.put(sub,objectName);
+            registerSubscription(objectName,sub.getConsumerInfo(),key,view);
         }catch(Exception e){
             log.error("Failed to register subscription "+sub,e);
         }
     }
 
     public void unregisterSubscription(Subscription sub){
-        Hashtable map=new Hashtable(brokerObjectName.getKeyPropertyList());
-        map.put("Type",JMXSupport.encodeObjectNamePart("Subscription"));
-        map.put("name",JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().toString()));
-        try{
-            ObjectName objectName=new ObjectName(brokerObjectName.getDomain(),map);
-            unregisterSubscription(objectName);
-        }catch(Exception e){
-            log.error("Failed to unregister subscription "+sub,e);
+        ObjectName name=(ObjectName) subscriptionMap.get(sub);
+        if(name!=null){
+            try{
+                unregisterSubscription(name);
+            }catch(Exception e){
+                log.error("Failed to unregister subscription "+sub,e);
+            }
         }
     }
 
@@ -163,7 +187,7 @@
         mbeanServer.unregisterMBean(key);
     }
 
-    protected void registerSubscription(ObjectName key,ConsumerInfo info,SubscriptionView view) throws Exception{
+    protected void registerSubscription(ObjectName key,ConsumerInfo info,SubscriptionKey subscriptionKey,SubscriptionView view) throws Exception{
         ActiveMQDestination dest=info.getDestination();
         if(dest.isQueue()){
             if(dest.isTemporary()){
@@ -177,6 +201,16 @@
             }else{
                 if(info.isDurable()){
                     durableTopicSubscribers.put(key,view);
+                    //unregister any inactive durable subs
+                    try {
+                        ObjectName inactiveName = (ObjectName) subscriptionKeys.get(subscriptionKey);
+                        if (inactiveName != null){
+                            inactiveDurableTopicSubscribers.remove(inactiveName);
+                            mbeanServer.unregisterMBean(inactiveName);
+                        }
+                    }catch(Exception e){
+                        log.error("Unable to unregister inactive durable subscriber: " + subscriptionKey,e);
+                    }
                 }else{
                     topicSubscribers.put(key,view);
                 }
@@ -188,10 +222,67 @@
     protected void unregisterSubscription(ObjectName key) throws Exception{
         queueSubscribers.remove(key);
         topicSubscribers.remove(key);
-        durableTopicSubscribers.remove(key);
+        inactiveDurableTopicSubscribers.remove(key);
         temporaryQueueSubscribers.remove(key);
         temporaryTopicSubscribers.remove(key);
         mbeanServer.unregisterMBean(key);
+        DurableSubscriptionView view = (DurableSubscriptionView) durableTopicSubscribers.remove(key);
+        if (view != null){
+            //need to put this back in the inactive list
+            SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(),view.getSubscriptionName());
+            SubscriptionInfo info = new SubscriptionInfo();
+            info.setClientId(subscriptionKey.getClientId());
+            info.setSubcriptionName(subscriptionKey.getSubscriptionName());
+            info.setDestination(new ActiveMQTopic(view.getDestinationName()));
+            addInactiveSubscription(subscriptionKey, info);
+        }
+        
+       
+    }
+    
+    protected void buildExistingSubscriptions() throws Exception{
+        Map subscriptions = new HashMap();
+        Set destinations = adaptor.getDestinations();
+        if (destinations != null){
+            for (Iterator iter = destinations.iterator(); iter.hasNext();){
+                ActiveMQDestination dest = (ActiveMQDestination) iter.next();
+                if (dest.isTopic()){
+                    TopicMessageStore store = adaptor.createTopicMessageStore((ActiveMQTopic) dest);
+                    SubscriptionInfo[] infos = store.getAllSubscriptions();
+                    if (infos != null){
+                        for (int i = 0; i < infos.length; i++) {
+                            
+                            SubscriptionInfo info = infos[i];
+                            log.debug("Restoring durable subscription: "+infos);
+                            SubscriptionKey key = new SubscriptionKey(info);
+                            subscriptions.put(key,info);
+                        }   
+                    }
+                }
+            }
+        }
+        for (Iterator i = subscriptions.entrySet().iterator();i.hasNext();){
+            Map.Entry entry = (Entry) i.next();
+            SubscriptionKey key = (SubscriptionKey) entry.getKey();
+            SubscriptionInfo info = (SubscriptionInfo) entry.getValue();
+            addInactiveSubscription(key, info);
+        }
+    }
+    
+    protected void addInactiveSubscription(SubscriptionKey key,SubscriptionInfo info){
+        Hashtable map=new Hashtable(brokerObjectName.getKeyPropertyList());
+        map.put("Type",JMXSupport.encodeObjectNamePart("Subscription"));
+        map.put("name",JMXSupport.encodeObjectNamePart(key.toString()));
+        map.put("active", "false");
+        try{
+            ObjectName objectName=new ObjectName(brokerObjectName.getDomain(),map);
+            SubscriptionView view = new InactiveDurableSubscriptionView(key.getClientId(),info);
+            mbeanServer.registerMBean(view,objectName);
+            inactiveDurableTopicSubscribers.put(objectName,view);
+            subscriptionKeys.put(key, objectName);
+        }catch(Exception e){
+            log.error("Failed to register subscription "+info,e);
+        }
     }
     
     protected  ObjectName[] getTopics(){
@@ -229,6 +320,11 @@
     }
     protected ObjectName[] getTemporaryQueueSubscribers(){
         Set set = temporaryQueueSubscribers.keySet();
+        return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+    }
+    
+    protected ObjectName[] getInactiveDurableTopicSubscribers(){
+        Set set = inactiveDurableTopicSubscribers.keySet();
         return (ObjectName[])set.toArray(new ObjectName[set.size()]);
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java?rev=382344&r1=382343&r2=382344&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java Thu Mar  2 02:31:23 2006
@@ -39,7 +39,7 @@
     
     protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
         Subscription sub = super.createSubscription(context, info);
-        regionBroker.registerSubscription(sub);
+        regionBroker.registerSubscription(context,sub);
         return sub;
     }
     

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java?rev=382344&r1=382343&r2=382344&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java Thu Mar  2 02:31:23 2006
@@ -39,7 +39,7 @@
     
     protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
         Subscription sub = super.createSubscription(context, info);
-        regionBroker.registerSubscription(sub);
+        regionBroker.registerSubscription(context,sub);
         return sub;
     }
     

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java?rev=382344&r1=382343&r2=382344&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java Thu Mar  2 02:31:23 2006
@@ -41,7 +41,7 @@
 
     protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
         Subscription sub = super.createSubscription(context, info);
-        regionBroker.registerSubscription(sub);
+        regionBroker.registerSubscription(context,sub);
         return sub;
     }
     

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java?rev=382344&r1=382343&r2=382344&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java Thu Mar  2 02:31:23 2006
@@ -29,22 +29,30 @@
     
     
     protected final Subscription subscription;
-    
+    protected final String clientId;
     
     
     /**
      * Constructior
      * @param subs
      */
-    public SubscriptionView(Subscription subs){
+    public SubscriptionView(String clientId,Subscription subs){
+        this.clientId = clientId;
         this.subscription = subs;
     }
     
     /**
+     * @return the clientId
+     */
+    public String getClientId(){
+        return clientId;
+    }
+    
+    /**
      * @return the id of the Connection the Subscription is on
      */
     public String getConnectionId(){
-        ConsumerInfo info = subscription.getConsumerInfo();
+        ConsumerInfo info = getConsumerInfo();
         if (info != null){
             return info.getConsumerId().getConnectionId();
         }
@@ -55,7 +63,7 @@
      * @return the id of the Session the subscription is on
      */
     public long getSessionId(){
-        ConsumerInfo info = subscription.getConsumerInfo();
+        ConsumerInfo info = getConsumerInfo();
         if (info != null){
             return info.getConsumerId().getSessionId();
         }
@@ -66,7 +74,7 @@
      * @return the id of the Subscription
      */
     public long getSubcriptionId(){
-        ConsumerInfo info = subscription.getConsumerInfo();
+        ConsumerInfo info = getConsumerInfo();
         if (info != null){
             return info.getConsumerId().getValue();
         }
@@ -77,7 +85,7 @@
      * @return the destination name
      */
     public String getDestinationName(){
-        ConsumerInfo info = subscription.getConsumerInfo();
+        ConsumerInfo info = getConsumerInfo();
         if (info != null){
             ActiveMQDestination dest = info.getDestination();
             return dest.getPhysicalName();
@@ -90,7 +98,7 @@
      * @return true if the destination is a Queue
      */
     public boolean isDestinationQueue(){
-        ConsumerInfo info = subscription.getConsumerInfo();
+        ConsumerInfo info = getConsumerInfo();
         if (info != null){
             ActiveMQDestination dest = info.getDestination();
             return dest.isQueue();
@@ -102,7 +110,7 @@
      * @return true of the destination is a Topic
      */
     public boolean isDestinationTopic(){
-        ConsumerInfo info = subscription.getConsumerInfo();
+        ConsumerInfo info = getConsumerInfo();
         if (info != null){
             ActiveMQDestination dest = info.getDestination();
             return dest.isTopic();
@@ -114,41 +122,54 @@
      * @return true if the destination is temporary
      */
     public boolean isDestinationTemporary(){
-        ConsumerInfo info = subscription.getConsumerInfo();
+        ConsumerInfo info = getConsumerInfo();
         if (info != null){
             ActiveMQDestination dest = info.getDestination();
             return dest.isTemporary();
         }
         return false;
     }
+    
+    /**
+     * @return true if the subscriber is active
+     */
+    public boolean isActive(){
+        return true;
+    }
 
     /**
      * The subscription should release as may references as it can to help the garbage collector
      * reclaim memory.
      */
     public void gc(){
+        if (subscription != null){
         subscription.gc();
+        }
     }
     
     /**
      * @return number of messages pending delivery
      */
     public int getPending(){
-        return subscription.pending();
+        return subscription != null ? subscription.pending() : 0;
     }
     
     /**
      * @return number of messages dispatched
      */
     public int getDispatched(){
-        return subscription.dispatched();
+        return subscription != null ? subscription.dispatched() : 0;
     }
     
     /**
      * @return number of messages delivered
      */
     public int getDelivered(){
-        return subscription.delivered();
+        return subscription != null ? subscription.delivered() : 0;
+    }
+    
+    protected ConsumerInfo getConsumerInfo(){
+        return subscription != null ? subscription.getConsumerInfo() : null;
     }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java?rev=382344&r1=382343&r2=382344&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java Thu Mar  2 02:31:23 2006
@@ -17,6 +17,11 @@
  * @version $Revision: 1.5 $
  */
 public interface SubscriptionViewMBean{
+    
+    /**
+     * @return the clientId
+     */
+    public String getClientId();
     /**
      * @return the id of the Connection the Subscription is on
      */
@@ -51,6 +56,11 @@
      * @return true if the destination is temporary
      */
     public boolean isDestinationTemporary();
+    
+    /**
+     * @return true if the subscriber is active
+     */
+    public boolean isActive();
 
     /**
      * The subscription should release as may references as it can to help the garbage collector reclaim memory.

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=382344&r1=382343&r2=382344&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Thu Mar  2 02:31:23 2006
@@ -81,7 +81,7 @@
     private BrokerId brokerId;
     private String brokerName;
     private Map clientIdSet = new HashMap(); // we will synchronize access
-    private PersistenceAdapter adaptor;
+    protected  PersistenceAdapter adaptor;
 
     public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, PersistenceAdapter adapter) throws IOException {
         this(brokerService,taskRunnerFactory, memoryManager, createDefaultPersistenceAdapter(memoryManager), null);