You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/03/02 23:11:28 UTC
svn commit: r382532 - in /incubator/activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/jmx/
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/util/ test/resources/
Author: chirino
Date: Thu Mar 2 14:11:25 2006
New Revision: 382532
URL: http://svn.apache.org/viewcvs?rev=382532&view=rev
Log:
Added more operations to the JMX beans.
- broker view can now create and destroy destinations
- queue view can now copy messages to other destinations.
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicView.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
incubator/activemq/trunk/activemq-core/src/test/resources/log4j.properties
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=382532&r1=382531&r2=382532&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Thu Mar 2 14:11:25 2006
@@ -866,6 +866,7 @@
if (isUseJmx()) {
ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
+ managedBroker.setContextBroker(broker);
BrokerViewMBean view = new BrokerView(managedBroker, getMemoryManager());
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
ObjectName objectName = getBrokerObjectName();
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=382532&r1=382531&r2=382532&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java Thu Mar 2 14:11:25 2006
@@ -17,8 +17,11 @@
package org.apache.activemq.broker.jmx;
import javax.management.ObjectName;
+
import org.apache.activemq.broker.Broker;
-import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.memory.UsageManager;
public class BrokerView implements BrokerViewMBean {
@@ -120,5 +123,27 @@
public ObjectName[] getInactiveDurableTopicSubscribers(){
return broker.getInactiveDurableTopicSubscribers();
}
+
+ public void addTopic(String name) throws Throwable {
+ broker.addDestination(getConnectionContext(broker.getContextBroker()), new ActiveMQTopic(name));
+ }
+
+ public void addQueue(String name) throws Throwable {
+ broker.addDestination(getConnectionContext(broker.getContextBroker()), new ActiveMQQueue(name));
+ }
+
+ public void removeTopic(String name) throws Throwable {
+ broker.removeDestination(getConnectionContext(broker.getContextBroker()), new ActiveMQTopic(name), 1000);
+ }
+
+ public void removeQueue(String name) throws Throwable {
+ broker.removeDestination(getConnectionContext(broker.getContextBroker()), new ActiveMQQueue(name), 1000);
+ }
+ static public ConnectionContext getConnectionContext(Broker broker) {
+ ConnectionContext context = new ConnectionContext();
+ context.setBroker(broker);
+ return context;
+ }
+
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java?rev=382532&r1=382531&r2=382532&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java Thu Mar 2 14:11:25 2006
@@ -50,5 +50,9 @@
public ObjectName[] getTemporaryTopicSubscribers();
public ObjectName[] getTemporaryQueueSubscribers();
+ public void addTopic(String name) throws Throwable;
+ public void addQueue(String name) throws Throwable;
+ public void removeTopic(String name) throws Throwable;
+ public void removeQueue(String name) throws Throwable;
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java?rev=382532&r1=382531&r2=382532&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java Thu Mar 2 14:11:25 2006
@@ -20,6 +20,7 @@
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
+
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQMessage;
@@ -27,8 +28,10 @@
public class DestinationView {
protected final Destination destination;
+ protected final ManagedRegionBroker broker;
- public DestinationView(Destination destination){
+ public DestinationView(ManagedRegionBroker broker, Destination destination){
+ this.broker = broker;
this.destination=destination;
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=382532&r1=382531&r2=382532&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Thu Mar 2 14:11:25 2006
@@ -16,13 +16,13 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
+
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
@@ -32,11 +32,12 @@
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
+
+import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.Region;
import org.apache.activemq.broker.region.RegionBroker;
@@ -58,8 +59,8 @@
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
-import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
public class ManagedRegionBroker extends RegionBroker{
private static final Log log=LogFactory.getLog(ManagedRegionBroker.class);
private final MBeanServer mbeanServer;
@@ -76,6 +77,9 @@
private final Map temporaryTopicSubscribers=new ConcurrentHashMap();
private final Map subscriptionKeys = new ConcurrentHashMap();
private final Map subscriptionMap = new ConcurrentHashMap();
+
+ /* This is the first broker in the broker interceptor chain. */
+ private Broker contextBroker;
public ManagedRegionBroker(BrokerService brokerService,MBeanServer mbeanServer,ObjectName brokerObjectName,
TaskRunnerFactory taskRunnerFactory,UsageManager memoryManager,PersistenceAdapter adapter,
@@ -119,9 +123,9 @@
ObjectName destObjectName=new ObjectName(brokerObjectName.getDomain(),map);
DestinationView view;
if(destination instanceof Queue){
- view=new QueueView((Queue) destination);
+ view=new QueueView(this, (Queue) destination);
}else{
- view=new TopicView((Topic) destination);
+ view=new TopicView(this, (Topic) destination);
}
registerDestination(destObjectName,destName,view);
}catch(Exception e){
@@ -386,5 +390,13 @@
protected ObjectName[] getInactiveDurableTopicSubscribers(){
Set set = inactiveDurableTopicSubscribers.keySet();
return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+ }
+
+ public Broker getContextBroker() {
+ return contextBroker;
+ }
+
+ public void setContextBroker(Broker contextBroker) {
+ this.contextBroker = contextBroker;
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java?rev=382532&r1=382531&r2=382532&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java Thu Mar 2 14:11:25 2006
@@ -16,10 +16,11 @@
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.OpenDataException;
import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
public class QueueView extends DestinationView implements QueueViewMBean{
- public QueueView(Queue destination){
- super(destination);
+ public QueueView(ManagedRegionBroker broker, Queue destination){
+ super(broker, destination);
}
public CompositeData getMessage(String messageId) throws OpenDataException{
@@ -35,5 +36,9 @@
public void purge(){
((Queue) destination).purge();
+ }
+
+ public boolean copyMessageTo(String messageId, String destinationName) throws Throwable {
+ return ((Queue) destination).copyMessageTo(BrokerView.getConnectionContext(broker.getContextBroker()), messageId, ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE));
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java?rev=382532&r1=382531&r2=382532&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java Thu Mar 2 14:11:25 2006
@@ -39,4 +39,5 @@
public void removeMessage(String messageId);
public void purge();
+ public boolean copyMessageTo(String messageId, String destinationName) throws Throwable;
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicView.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicView.java?rev=382532&r1=382531&r2=382532&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicView.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicView.java Thu Mar 2 14:11:25 2006
@@ -16,7 +16,7 @@
import org.apache.activemq.broker.region.Topic;
public class TopicView extends DestinationView implements TopicViewMBean{
- public TopicView(Topic destination){
- super(destination);
+ public TopicView(ManagedRegionBroker broker, Topic destination){
+ super(broker, destination);
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=382532&r1=382531&r2=382532&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Thu Mar 2 14:11:25 2006
@@ -24,6 +24,7 @@
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.util.BrokerSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.InvalidSelectorException;
@@ -168,23 +169,10 @@
// The original destination and transaction id do not get filled when the message is first
// sent,
// it is only populated if the message is routed to another destination like the DLQ
- if(message.getOriginalDestination()!=null)
- message.setOriginalDestination(message.getDestination());
- if(message.getOriginalTransactionId()!=null)
- message.setOriginalTransactionId(message.getTransactionId());
DeadLetterStrategy deadLetterStrategy=node.getRegionDestination().getDeadLetterStrategy();
- ActiveMQDestination deadLetterDestination=deadLetterStrategy.getDeadLetterQueueFor(message
- .getDestination());
- message.setDestination(deadLetterDestination);
- message.setTransactionId(null);
- message.evictMarshlledForm();
- boolean originalFlowControl=context.isProducerFlowControl();
- try{
- context.setProducerFlowControl(false);
- context.getBroker().send(context,message);
- }finally{
- context.setProducerFlowControl(originalFlowControl);
- }
+ ActiveMQDestination deadLetterDestination=deadLetterStrategy.getDeadLetterQueueFor(message.getDestination());
+ BrokerSupport.resend(context, message, deadLetterDestination);
+
}
}finally{
node.decrementReferenceCount();
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=382532&r1=382531&r2=382532&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Mar 2 14:11:25 2006
@@ -42,6 +42,7 @@
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.thread.Valve;
import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.util.BrokerSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -498,6 +499,28 @@
}
}
}
+ }
+
+ public boolean copyMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest) throws Throwable {
+ synchronized (messages) {
+ for (Iterator iter = messages.iterator(); iter.hasNext();) {
+ try {
+ MessageReference r = (MessageReference) iter.next();
+ if (messageId.equals(r.getMessageId().toString())) {
+ r.incrementReferenceCount();
+ try {
+ Message m = r.getMessage();
+ BrokerSupport.resend(context, m, dest);
+ } finally {
+ r.decrementReferenceCount();
+ }
+ break;
+ }
+ } catch (IOException e) {
+ }
+ }
+ }
+ return false;
}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java?rev=382532&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java Thu Mar 2 14:11:25 2006
@@ -0,0 +1,32 @@
+package org.apache.activemq.util;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+
+public class BrokerSupport {
+
+ /**
+ * @param context
+ * @param message
+ * @param deadLetterDestination
+ * @throws Throwable
+ */
+ static public void resend(final ConnectionContext context, Message message, ActiveMQDestination deadLetterDestination) throws Throwable {
+ if(message.getOriginalDestination()!=null)
+ message.setOriginalDestination(message.getDestination());
+ if(message.getOriginalTransactionId()!=null)
+ message.setOriginalTransactionId(message.getTransactionId());
+ message.setDestination(deadLetterDestination);
+ message.setTransactionId(null);
+ message.evictMarshlledForm();
+ boolean originalFlowControl=context.isProducerFlowControl();
+ try{
+ context.setProducerFlowControl(false);
+ context.getBroker().send(context,message);
+ }finally{
+ context.setProducerFlowControl(originalFlowControl);
+ }
+ }
+
+}
Modified: incubator/activemq/trunk/activemq-core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/resources/log4j.properties?rev=382532&r1=382531&r2=382532&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/resources/log4j.properties (original)
+++ incubator/activemq/trunk/activemq-core/src/test/resources/log4j.properties Thu Mar 2 14:11:25 2006
@@ -1,7 +1,7 @@
#
# The logging properties used during tests..
#
-log4j.rootLogger=INFO, out
+log4j.rootLogger=DEBUG, stdout
log4j.logger.org.apache.activemq.spring=WARN