You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/03/02 15:31:06 UTC
svn commit: r382393 - in
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx:
DurableSubscriptionView.java InactiveDurableSubscriptionView.java
ManagedRegionBroker.java SubscriptionView.java
Author: rajdavies
Date: Thu Mar 2 06:31:05 2006
New Revision: 382393
URL: http://svn.apache.org/viewcvs?rev=382393&view=rev
Log:
added support fro browsing messages for a durable subscriber
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java?rev=382393&r1=382392&r2=382393&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java Thu Mar 2 06:31:05 2006
@@ -22,14 +22,16 @@
*/
public class DurableSubscriptionView extends SubscriptionView implements DurableSubscriptionViewMBean {
+ protected ManagedRegionBroker broker;
protected String subscriptionName;
/**
* Constructor
* @param clientId
* @param sub
*/
- public DurableSubscriptionView(String clientId,Subscription sub){
+ public DurableSubscriptionView(ManagedRegionBroker broker,String clientId,Subscription sub){
super(clientId,sub);
+ this.broker = broker;
this.subscriptionName = sub.getConsumerInfo().getSubcriptionName();
}
@@ -47,7 +49,7 @@
* @throws OpenDataException
*/
public CompositeData[] browse() throws OpenDataException{
- return null;
+ return broker.browse(this);
}
/**
@@ -57,6 +59,10 @@
* @throws OpenDataException
*/
public TabularData browseAsTable() throws OpenDataException{
- return null;
+ return broker.browseAsTable(this);
+ }
+
+ public String toString(){
+ return "InactiveDurableSubscriptionView: " + getClientId() + ":" + getSubscriptionName();
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java?rev=382393&r1=382392&r2=382393&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java Thu Mar 2 06:31:05 2006
@@ -21,10 +21,19 @@
* @version $Revision: 1.5 $
*/
public class InactiveDurableSubscriptionView extends SubscriptionView implements DurableSubscriptionViewMBean {
-
+ protected ManagedRegionBroker broker;
protected SubscriptionInfo info;
- public InactiveDurableSubscriptionView(String clientId,SubscriptionInfo sub){
+
+
+ /**
+ * Constructor
+ * @param broker
+ * @param clientId
+ * @param sub
+ */
+ public InactiveDurableSubscriptionView(ManagedRegionBroker broker,String clientId,SubscriptionInfo sub){
super(clientId,null);
+ this.broker = broker;
this.info = sub;
}
@@ -87,7 +96,7 @@
* @throws OpenDataException
*/
public CompositeData[] browse() throws OpenDataException{
- return null;
+ return broker.browse(this);
}
/**
@@ -97,6 +106,10 @@
* @throws OpenDataException
*/
public TabularData browseAsTable() throws OpenDataException{
- return null;
+ return broker.browseAsTable(this);
+ }
+
+ public String toString(){
+ return "InactiveDurableSubscriptionView: " + getClientId() + ":" + getSubscriptionName();
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=382393&r1=382392&r2=382393&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Thu Mar 2 06:31:05 2006
@@ -14,17 +14,27 @@
package org.apache.activemq.broker.jmx;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Queue;
@@ -34,10 +44,13 @@
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.thread.TaskRunnerFactory;
@@ -46,6 +59,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
public class ManagedRegionBroker extends RegionBroker{
private static final Log log=LogFactory.getLog(ManagedRegionBroker.class);
private final MBeanServer mbeanServer;
@@ -139,7 +153,7 @@
ObjectName objectName=new ObjectName(brokerObjectName.getDomain(),map);
SubscriptionView view;
if(sub.getConsumerInfo().isDurable()){
- view=new DurableSubscriptionView(context.getClientId(),sub);
+ view=new DurableSubscriptionView(this,context.getClientId(),sub);
}else{
view=new SubscriptionView(context.getClientId(),sub);
}
@@ -275,13 +289,60 @@
map.put("active", "false");
try{
ObjectName objectName=new ObjectName(brokerObjectName.getDomain(),map);
- SubscriptionView view = new InactiveDurableSubscriptionView(key.getClientId(),info);
+ SubscriptionView view = new InactiveDurableSubscriptionView(this,key.getClientId(),info);
mbeanServer.registerMBean(view,objectName);
inactiveDurableTopicSubscribers.put(objectName,view);
subscriptionKeys.put(key, objectName);
}catch(Exception e){
log.error("Failed to register subscription "+info,e);
}
+ }
+
+ public CompositeData[] browse(SubscriptionView view) throws OpenDataException{
+ List messages = getSubscriberMessages(view);
+ CompositeData c[]=new CompositeData[messages.size()];
+ for(int i=0;i<c.length;i++){
+ try{
+ c[i]=OpenTypeSupport.convert((Message) messages.get(i));
+ }catch(Throwable e){
+ e.printStackTrace();
+ }
+ }
+ return c;
+ }
+
+ public TabularData browseAsTable(SubscriptionView view) throws OpenDataException{
+ OpenTypeFactory factory=OpenTypeSupport.getFactory(ActiveMQMessage.class);
+ List messages = getSubscriberMessages(view);
+ CompositeType ct=factory.getCompositeType();
+ TabularType tt=new TabularType("MessageList","MessageList",ct,new String[] { "JMSMessageID" });
+ TabularDataSupport rc=new TabularDataSupport(tt);
+ for(int i=0;i<messages.size();i++){
+ rc.put(new CompositeDataSupport(ct,factory.getFields(messages.get(i))));
+ }
+ return rc;
+ }
+
+ protected List getSubscriberMessages(SubscriptionView view){
+ final List result = new ArrayList();
+ try {
+ ActiveMQTopic topic = new ActiveMQTopic(view.getDestinationName());
+ TopicMessageStore store = adaptor.createTopicMessageStore(topic);
+
+
+ store.recover(new MessageRecoveryListener(){
+ public void recoverMessage(Message message) throws Throwable{
+ result.add(message);
+ }
+
+ public void recoverMessageReference(String messageReference) throws Throwable{}
+
+ public void finished(){}
+ });
+ }catch(Throwable e){
+ log.error("Failed to browse messages for Subscription " + view,e);
+ }
+ return result;
}
protected ObjectName[] getTopics(){
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java?rev=382393&r1=382392&r2=382393&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java Thu Mar 2 06:31:05 2006
@@ -171,5 +171,12 @@
protected ConsumerInfo getConsumerInfo(){
return subscription != null ? subscription.getConsumerInfo() : null;
}
+
+ /**
+ *@return pretty print
+ */
+ public String toString(){
+ return "SubscriptionView: " + getClientId() + ":" + getConnectionId();
+ }
}