You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/03/02 23:11:28 UTC

svn commit: r382532 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/jmx/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/util/ test/resources/

Author: chirino
Date: Thu Mar  2 14:11:25 2006
New Revision: 382532

URL: http://svn.apache.org/viewcvs?rev=382532&view=rev
Log:
Added more operations to the JMX beans.
 - broker view can now create and destroy destinations
 - queue view can now copy messages to other destinations.

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicView.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    incubator/activemq/trunk/activemq-core/src/test/resources/log4j.properties

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=382532&r1=382531&r2=382532&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Thu Mar  2 14:11:25 2006
@@ -866,6 +866,7 @@
 
         if (isUseJmx()) {
             ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
+            managedBroker.setContextBroker(broker);
             BrokerViewMBean view = new BrokerView(managedBroker, getMemoryManager());
             MBeanServer mbeanServer = getManagementContext().getMBeanServer();
             ObjectName objectName = getBrokerObjectName();

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=382532&r1=382531&r2=382532&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java Thu Mar  2 14:11:25 2006
@@ -17,8 +17,11 @@
 package org.apache.activemq.broker.jmx;
 
 import javax.management.ObjectName;
+
 import org.apache.activemq.broker.Broker;
-import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.memory.UsageManager;
 
 public class BrokerView implements BrokerViewMBean {
@@ -120,5 +123,27 @@
     public ObjectName[] getInactiveDurableTopicSubscribers(){
         return broker.getInactiveDurableTopicSubscribers();
     }
+
+    public void addTopic(String name) throws Throwable {
+        broker.addDestination(getConnectionContext(broker.getContextBroker()), new ActiveMQTopic(name));
+    }
+
+    public void addQueue(String name) throws Throwable {
+        broker.addDestination(getConnectionContext(broker.getContextBroker()), new ActiveMQQueue(name));
+    }
+
+    public void removeTopic(String name) throws Throwable {
+        broker.removeDestination(getConnectionContext(broker.getContextBroker()), new ActiveMQTopic(name), 1000);
+    }
+
+    public void removeQueue(String name) throws Throwable {
+        broker.removeDestination(getConnectionContext(broker.getContextBroker()), new ActiveMQQueue(name), 1000);
+    }
     
+    static public ConnectionContext getConnectionContext(Broker broker) {
+        ConnectionContext context = new ConnectionContext();
+        context.setBroker(broker);
+        return context;
+    }
+
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java?rev=382532&r1=382531&r2=382532&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java Thu Mar  2 14:11:25 2006
@@ -50,5 +50,9 @@
     public ObjectName[] getTemporaryTopicSubscribers();
     public ObjectName[] getTemporaryQueueSubscribers();
     
+    public void addTopic(String name) throws Throwable;
+    public void addQueue(String name) throws Throwable;
+    public void removeTopic(String name) throws Throwable;
+    public void removeQueue(String name) throws Throwable;
     
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java?rev=382532&r1=382531&r2=382532&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java Thu Mar  2 14:11:25 2006
@@ -20,6 +20,7 @@
 import javax.management.openmbean.TabularData;
 import javax.management.openmbean.TabularDataSupport;
 import javax.management.openmbean.TabularType;
+
 import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.command.ActiveMQMessage;
@@ -27,8 +28,10 @@
 
 public class DestinationView {
     protected final Destination destination;
+    protected final ManagedRegionBroker broker;
 
-    public DestinationView(Destination destination){
+    public DestinationView(ManagedRegionBroker broker, Destination destination){
+        this.broker = broker;
         this.destination=destination;
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=382532&r1=382531&r2=382532&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Thu Mar  2 14:11:25 2006
@@ -16,13 +16,13 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.Map.Entry;
+
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import javax.management.openmbean.CompositeData;
@@ -32,11 +32,12 @@
 import javax.management.openmbean.TabularData;
 import javax.management.openmbean.TabularDataSupport;
 import javax.management.openmbean.TabularType;
+
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
 import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.DurableTopicSubscription;
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.broker.region.Region;
 import org.apache.activemq.broker.region.RegionBroker;
@@ -58,8 +59,8 @@
 import org.apache.activemq.util.SubscriptionKey;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
-import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
 public class ManagedRegionBroker extends RegionBroker{
     private static final Log log=LogFactory.getLog(ManagedRegionBroker.class);
     private final MBeanServer mbeanServer;
@@ -76,6 +77,9 @@
     private final Map temporaryTopicSubscribers=new ConcurrentHashMap();
     private final Map subscriptionKeys = new ConcurrentHashMap();
     private final Map subscriptionMap = new ConcurrentHashMap();
+    
+    /* This is the first broker in the broker interceptor chain. */
+    private Broker contextBroker;
 
     public ManagedRegionBroker(BrokerService brokerService,MBeanServer mbeanServer,ObjectName brokerObjectName,
                     TaskRunnerFactory taskRunnerFactory,UsageManager memoryManager,PersistenceAdapter adapter,
@@ -119,9 +123,9 @@
             ObjectName destObjectName=new ObjectName(brokerObjectName.getDomain(),map);
             DestinationView view;
             if(destination instanceof Queue){
-                view=new QueueView((Queue) destination);
+                view=new QueueView(this, (Queue) destination);
             }else{
-                view=new TopicView((Topic) destination);
+                view=new TopicView(this, (Topic) destination);
             }
             registerDestination(destObjectName,destName,view);
         }catch(Exception e){
@@ -386,5 +390,13 @@
     protected ObjectName[] getInactiveDurableTopicSubscribers(){
         Set set = inactiveDurableTopicSubscribers.keySet();
         return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+    }
+
+    public Broker getContextBroker() {
+        return contextBroker;
+    }
+
+    public void setContextBroker(Broker contextBroker) {
+        this.contextBroker = contextBroker;
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java?rev=382532&r1=382531&r2=382532&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java Thu Mar  2 14:11:25 2006
@@ -16,10 +16,11 @@
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.OpenDataException;
 import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 public class QueueView extends DestinationView implements QueueViewMBean{
-    public QueueView(Queue destination){
-        super(destination);
+    public QueueView(ManagedRegionBroker broker, Queue destination){
+        super(broker, destination);
     }
 
     public CompositeData getMessage(String messageId) throws OpenDataException{
@@ -35,5 +36,9 @@
 
     public void purge(){
         ((Queue) destination).purge();
+    }
+
+    public boolean copyMessageTo(String messageId, String destinationName) throws Throwable {
+        return ((Queue) destination).copyMessageTo(BrokerView.getConnectionContext(broker.getContextBroker()), messageId, ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE));
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java?rev=382532&r1=382531&r2=382532&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java Thu Mar  2 14:11:25 2006
@@ -39,4 +39,5 @@
     public void removeMessage(String messageId);
     public void purge();
 
+    public boolean copyMessageTo(String messageId, String destinationName) throws Throwable;
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicView.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicView.java?rev=382532&r1=382531&r2=382532&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicView.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicView.java Thu Mar  2 14:11:25 2006
@@ -16,7 +16,7 @@
 import org.apache.activemq.broker.region.Topic;
 public class TopicView extends DestinationView implements TopicViewMBean{
     
-    public TopicView(Topic destination){
-        super(destination);
+    public TopicView(ManagedRegionBroker broker, Topic destination){
+        super(broker, destination);
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=382532&r1=382531&r2=382532&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Thu Mar  2 14:11:25 2006
@@ -24,6 +24,7 @@
 import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.util.BrokerSupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import javax.jms.InvalidSelectorException;
@@ -168,23 +169,10 @@
                             // The original destination and transaction id do not get filled when the message is first
                             // sent,
                             // it is only populated if the message is routed to another destination like the DLQ
-                            if(message.getOriginalDestination()!=null)
-                                message.setOriginalDestination(message.getDestination());
-                            if(message.getOriginalTransactionId()!=null)
-                                message.setOriginalTransactionId(message.getTransactionId());
                             DeadLetterStrategy deadLetterStrategy=node.getRegionDestination().getDeadLetterStrategy();
-                            ActiveMQDestination deadLetterDestination=deadLetterStrategy.getDeadLetterQueueFor(message
-                                            .getDestination());
-                            message.setDestination(deadLetterDestination);
-                            message.setTransactionId(null);
-                            message.evictMarshlledForm();
-                            boolean originalFlowControl=context.isProducerFlowControl();
-                            try{
-                                context.setProducerFlowControl(false);
-                                context.getBroker().send(context,message);
-                            }finally{
-                                context.setProducerFlowControl(originalFlowControl);
-                            }
+                            ActiveMQDestination deadLetterDestination=deadLetterStrategy.getDeadLetterQueueFor(message.getDestination());
+                            BrokerSupport.resend(context, message, deadLetterDestination);
+
                         }
                     }finally{
                         node.decrementReferenceCount();

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=382532&r1=382531&r2=382532&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Mar  2 14:11:25 2006
@@ -42,6 +42,7 @@
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.thread.Valve;
 import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.util.BrokerSupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -498,6 +499,28 @@
                 }
             }
         }
+    }
+
+    public boolean copyMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest) throws Throwable {
+        synchronized (messages) {
+            for (Iterator iter = messages.iterator(); iter.hasNext();) {
+                try {
+                    MessageReference r = (MessageReference) iter.next();
+                    if (messageId.equals(r.getMessageId().toString())) {
+                        r.incrementReferenceCount();
+                        try {
+                            Message m = r.getMessage();
+                            BrokerSupport.resend(context, m, dest);                            
+                        } finally {
+                            r.decrementReferenceCount();
+                        }
+                        break;
+                    }
+                } catch (IOException e) {
+                }
+            }
+        }
+        return false;
     }
 
 

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java?rev=382532&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java Thu Mar  2 14:11:25 2006
@@ -0,0 +1,32 @@
+package org.apache.activemq.util;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+
+public class BrokerSupport {
+    
+    /**
+     * @param context
+     * @param message
+     * @param deadLetterDestination
+     * @throws Throwable
+     */
+    static public void resend(final ConnectionContext context, Message message, ActiveMQDestination deadLetterDestination) throws Throwable {
+        if(message.getOriginalDestination()!=null)
+            message.setOriginalDestination(message.getDestination());
+        if(message.getOriginalTransactionId()!=null)
+            message.setOriginalTransactionId(message.getTransactionId());                            
+        message.setDestination(deadLetterDestination);
+        message.setTransactionId(null);
+        message.evictMarshlledForm();
+        boolean originalFlowControl=context.isProducerFlowControl();
+        try{
+            context.setProducerFlowControl(false);
+            context.getBroker().send(context,message);
+        }finally{
+            context.setProducerFlowControl(originalFlowControl);
+        }
+    }
+
+}

Modified: incubator/activemq/trunk/activemq-core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/resources/log4j.properties?rev=382532&r1=382531&r2=382532&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/resources/log4j.properties (original)
+++ incubator/activemq/trunk/activemq-core/src/test/resources/log4j.properties Thu Mar  2 14:11:25 2006
@@ -1,7 +1,7 @@
 #
 # The logging properties used during tests..
 #
-log4j.rootLogger=INFO, out
+log4j.rootLogger=DEBUG, stdout
 
 log4j.logger.org.apache.activemq.spring=WARN