You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/03/02 15:31:06 UTC

svn commit: r382393 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx: DurableSubscriptionView.java InactiveDurableSubscriptionView.java ManagedRegionBroker.java SubscriptionView.java

Author: rajdavies
Date: Thu Mar  2 06:31:05 2006
New Revision: 382393

URL: http://svn.apache.org/viewcvs?rev=382393&view=rev
Log:
added support fro browsing messages for a durable subscriber

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java?rev=382393&r1=382392&r2=382393&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java Thu Mar  2 06:31:05 2006
@@ -22,14 +22,16 @@
  */
 public class DurableSubscriptionView extends SubscriptionView implements  DurableSubscriptionViewMBean {
     
+    protected ManagedRegionBroker broker;
     protected String subscriptionName;
     /**
      * Constructor
      * @param clientId
      * @param sub
      */
-    public DurableSubscriptionView(String clientId,Subscription sub){
+    public DurableSubscriptionView(ManagedRegionBroker broker,String clientId,Subscription sub){
         super(clientId,sub);
+        this.broker = broker;
         this.subscriptionName = sub.getConsumerInfo().getSubcriptionName();
     }
     
@@ -47,7 +49,7 @@
      * @throws OpenDataException
      */
     public CompositeData[] browse() throws OpenDataException{
-        return null;
+        return broker.browse(this);
     }
 
     /**
@@ -57,6 +59,10 @@
      * @throws OpenDataException
      */
     public TabularData browseAsTable() throws OpenDataException{
-        return null;
+        return broker.browseAsTable(this);
+    }
+    
+    public String toString(){
+        return "InactiveDurableSubscriptionView: " + getClientId()  + ":" +  getSubscriptionName();
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java?rev=382393&r1=382392&r2=382393&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java Thu Mar  2 06:31:05 2006
@@ -21,10 +21,19 @@
  * @version $Revision: 1.5 $
  */
 public class InactiveDurableSubscriptionView extends SubscriptionView implements  DurableSubscriptionViewMBean {
-    
+    protected ManagedRegionBroker broker;
     protected SubscriptionInfo info;
-    public InactiveDurableSubscriptionView(String clientId,SubscriptionInfo sub){
+    
+    
+    /**
+     * Constructor
+     * @param broker
+     * @param clientId
+     * @param sub
+     */
+    public InactiveDurableSubscriptionView(ManagedRegionBroker broker,String clientId,SubscriptionInfo sub){
         super(clientId,null);
+        this.broker = broker;
         this.info = sub;
     }
     
@@ -87,7 +96,7 @@
      * @throws OpenDataException
      */
     public CompositeData[] browse() throws OpenDataException{
-        return null;
+        return broker.browse(this);
     }
 
     /**
@@ -97,6 +106,10 @@
      * @throws OpenDataException
      */
     public TabularData browseAsTable() throws OpenDataException{
-        return null;
+        return broker.browseAsTable(this);
+    }
+    
+    public String toString(){
+        return "InactiveDurableSubscriptionView: " + getClientId()  + ":" +  getSubscriptionName();
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=382393&r1=382392&r2=382393&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Thu Mar  2 06:31:05 2006
@@ -14,17 +14,27 @@
 package org.apache.activemq.broker.jmx;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Hashtable;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.Map.Entry;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DurableTopicSubscription;
 import org.apache.activemq.broker.region.Queue;
@@ -34,10 +44,13 @@
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.thread.TaskRunnerFactory;
@@ -46,6 +59,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
 public class ManagedRegionBroker extends RegionBroker{
     private static final Log log=LogFactory.getLog(ManagedRegionBroker.class);
     private final MBeanServer mbeanServer;
@@ -139,7 +153,7 @@
             ObjectName objectName=new ObjectName(brokerObjectName.getDomain(),map);
             SubscriptionView view;
             if(sub.getConsumerInfo().isDurable()){
-                view=new DurableSubscriptionView(context.getClientId(),sub);
+                view=new DurableSubscriptionView(this,context.getClientId(),sub);
             }else{
                 view=new SubscriptionView(context.getClientId(),sub);
             }
@@ -275,13 +289,60 @@
         map.put("active", "false");
         try{
             ObjectName objectName=new ObjectName(brokerObjectName.getDomain(),map);
-            SubscriptionView view = new InactiveDurableSubscriptionView(key.getClientId(),info);
+            SubscriptionView view = new InactiveDurableSubscriptionView(this,key.getClientId(),info);
             mbeanServer.registerMBean(view,objectName);
             inactiveDurableTopicSubscribers.put(objectName,view);
             subscriptionKeys.put(key, objectName);
         }catch(Exception e){
             log.error("Failed to register subscription "+info,e);
         }
+    }
+    
+    public CompositeData[] browse(SubscriptionView view) throws OpenDataException{
+        List messages = getSubscriberMessages(view);
+        CompositeData c[]=new CompositeData[messages.size()];
+        for(int i=0;i<c.length;i++){
+            try{
+                c[i]=OpenTypeSupport.convert((Message) messages.get(i));
+            }catch(Throwable e){
+                e.printStackTrace();
+            }
+        }
+        return c;
+    }
+
+    public TabularData browseAsTable(SubscriptionView view) throws OpenDataException{
+        OpenTypeFactory factory=OpenTypeSupport.getFactory(ActiveMQMessage.class);
+        List messages = getSubscriberMessages(view);
+        CompositeType ct=factory.getCompositeType();
+        TabularType tt=new TabularType("MessageList","MessageList",ct,new String[] { "JMSMessageID" });
+        TabularDataSupport rc=new TabularDataSupport(tt);
+        for(int i=0;i<messages.size();i++){
+            rc.put(new CompositeDataSupport(ct,factory.getFields(messages.get(i))));
+        }
+        return rc;
+    }
+    
+    protected List getSubscriberMessages(SubscriptionView view){
+        final List result = new ArrayList();
+        try {
+        ActiveMQTopic  topic = new ActiveMQTopic(view.getDestinationName());
+        TopicMessageStore store = adaptor.createTopicMessageStore(topic);
+       
+       
+            store.recover(new MessageRecoveryListener(){
+                public void recoverMessage(Message message) throws Throwable{
+                    result.add(message);
+                }
+
+                public void recoverMessageReference(String messageReference) throws Throwable{}
+
+                public void finished(){}
+            });
+        }catch(Throwable e){
+            log.error("Failed to browse messages for Subscription " + view,e);
+        }
+        return result;
     }
     
     protected  ObjectName[] getTopics(){

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java?rev=382393&r1=382392&r2=382393&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java Thu Mar  2 06:31:05 2006
@@ -171,5 +171,12 @@
     protected ConsumerInfo getConsumerInfo(){
         return subscription != null ? subscription.getConsumerInfo() : null;
     }
+    
+    /**
+     *@return pretty print
+     */
+    public String toString(){
+        return "SubscriptionView: " + getClientId()  + ":" +  getConnectionId();
+    }
 
 }