You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/01/05 18:09:45 UTC

svn commit: r366220 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/command/ main/java/org/apache/activemq/util/ test/java/org/apache/activemq/

Author: chirino
Date: Thu Jan  5 09:09:05 2006
New Revision: 366220

URL: http://svn.apache.org/viewcvs?rev=366220&view=rev
Log:
- Refactored out introspection based toString() logic to the IntrospectionSupport class
- Durable subscriptions are now eagerly loaded when a topic is created.
- Fixed the *TransactionTest 

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IntrospectionSupport.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=366220&r1=366219&r2=366220&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Thu Jan  5 09:09:05 2006
@@ -31,6 +31,8 @@
 import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
 
@@ -40,6 +42,8 @@
  */
 abstract public class AbstractRegion implements Region {
     
+    private static final Log log = LogFactory.getLog(AbstractRegion.class);
+
     protected final ConcurrentHashMap destinations = new ConcurrentHashMap();
     protected final DestinationMap destinationMap = new DestinationMap();
     protected final ConcurrentHashMap subscriptions = new ConcurrentHashMap();
@@ -57,7 +61,14 @@
         this.persistenceAdapter = persistenceAdapter;
     }
 
+    public void start() throws Exception {
+    }
+    
+    public void stop() throws Exception {
+    }
+    
     public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Throwable {
+        log.debug("Adding destination: "+destination);
         Destination dest = createDestination(destination);
         dest.start();
         synchronized(destinationsMutex){
@@ -86,6 +97,7 @@
             }
         }
 
+        log.debug("Removing destination: "+destination);
         synchronized(destinationsMutex){
             Destination dest=(Destination) destinations.remove(destination);
             if(dest==null)

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?rev=366220&r1=366219&r2=366220&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java Thu Jan  5 09:09:05 2006
@@ -36,7 +36,7 @@
 
 abstract public class AbstractSubscription implements Subscription {
     
-    protected final Log log;
+    static private final Log log = LogFactory.getLog(AbstractSubscription.class);
     
     protected ConnectionContext context;
     protected ConsumerInfo info;
@@ -50,7 +50,6 @@
         this.info = info;
         this.destinationFilter = DestinationFilter.parseFilter(info.getDestination());
         this.selector = parseSelector(info);
-        this.log = LogFactory.getLog(getClass().getName()+"."+info.getConsumerId());
     }
     
     static private BooleanExpression parseSelector(ConsumerInfo info) throws InvalidSelectorException {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=366220&r1=366219&r2=366220&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Thu Jan  5 09:09:05 2006
@@ -26,6 +26,7 @@
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.SubscriptionInfo;
 
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
 
@@ -44,6 +45,22 @@
         this.subscriptionName = info.getSubcriptionName();
     }
     
+    public DurableTopicSubscription(SubscriptionInfo info) throws InvalidSelectorException {
+        super(null, createFakeConsumerInfo(info));
+        this.clientId = info.getClientId();
+        this.subscriptionName = info.getSubcriptionName();
+        active=false;
+        recovered=false;        
+    }
+
+    private static ConsumerInfo createFakeConsumerInfo(SubscriptionInfo info) {
+        ConsumerInfo rc = new ConsumerInfo();
+        rc.setSelector(info.getSelector());
+        rc.setSubcriptionName(info.getSubcriptionName());
+        rc.setDestination(info.getDestination());
+        return rc;
+    }
+
     synchronized public boolean isActive() {
         return active;
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=366220&r1=366219&r2=366220&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Thu Jan  5 09:09:05 2006
@@ -25,6 +25,8 @@
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.transaction.Synchronization;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
@@ -39,6 +41,7 @@
  * @version $Revision: 1.15 $
  */
 abstract public class PrefetchSubscription extends AbstractSubscription {
+    static private final Log log = LogFactory.getLog(PrefetchSubscription.class);
     
     final protected LinkedList matched = new LinkedList();
     final protected LinkedList dispatched = new LinkedList();

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java?rev=366220&r1=366219&r2=366220&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java Thu Jan  5 09:09:05 2006
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker.region;
 
+import org.apache.activemq.Service;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerInfo;
@@ -25,16 +26,16 @@
 
 /**
  * A Region is used to implement the different QOS options available to 
- * a broker.  A Broker is composed of multiple mesasge processing Regions that
+ * a broker.  A Broker is composed of multiple message processing Regions that
  * provide different QOS options.
  * 
  * @version $Revision$
  */
-public interface Region {
+public interface Region extends Service {
 
     /**
      * Used to create a destination.  Usually, this method is invoked as a side-effect of sending
-     * a message to a destiantion that does not exist yet.
+     * a message to a destination that does not exist yet.
      * 
      * @param context
      * @param destination the destination to create.
@@ -43,11 +44,11 @@
     public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Throwable;
     
     /**
-     * Used to destory a destination.  
-     * This shoud try to quiesce use of the destination up to the timeout alotted time before removing the destination.
+     * Used to destroy a destination.  
+     * This should try to quiesce use of the destination up to the timeout allotted time before removing the destination.
      * This will remove all persistent messages associated with the destination.
      * 
-     * @param context the enviorment the operation is being executed under.
+     * @param context the environment the operation is being executed under.
      * @param destination what is being removed from the broker.
      * @param timeout the max amount of time to wait for the destination to quiesce
      */
@@ -55,19 +56,19 @@
 
     /**
      * Adds a consumer.
-     * @param context the enviorment the operation is being executed under.
+     * @param context the environment the operation is being executed under.
      */
     public void addConsumer(ConnectionContext context, ConsumerInfo info) throws Throwable;
 
     /**
      * Removes a consumer.
-     * @param context the enviorment the operation is being executed under.
+     * @param context the environment the operation is being executed under.
      */
     public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Throwable;
 
     /**
      * Deletes a durable subscription.
-     * @param context the enviorment the operation is being executed under.
+     * @param context the environment the operation is being executed under.
      * @param info TODO
      */
     public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Throwable;
@@ -76,13 +77,13 @@
      * Send a message to the broker to using the specified destination.  The destination specified
      * in the message does not need to match the destination the message is sent to.  This is 
      * handy in case the message is being sent to a dead letter destination.
-     * @param context the enviorment the operation is being executed under.
+     * @param context the environment the operation is being executed under.
      */
     public void send(ConnectionContext context, Message message) throws Throwable;
     
     /**
      * Used to acknowledge the receipt of a message by a client.
-     * @param context the enviorment the operation is being executed under.
+     * @param context the environment the operation is being executed under.
      */
     public void acknowledge(ConnectionContext context, MessageAck ack) throws Throwable;
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=366220&r1=366219&r2=366220&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Thu Jan  5 09:09:05 2006
@@ -39,6 +39,7 @@
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.LongSequenceGenerator;
+import org.apache.activemq.util.ServiceStopper;
 
 import javax.jms.InvalidClientIDException;
 import javax.jms.JMSException;
@@ -108,9 +109,19 @@
     
     
     public void start() throws Exception {
+        queueRegion.start();
+        topicRegion.start();
+        tempQueueRegion.start();
+        tempTopicRegion.start();
     }
 
     public void stop() throws Exception {
+        ServiceStopper ss = new ServiceStopper();
+        ss.stop(queueRegion);
+        ss.stop(topicRegion);
+        ss.stop(tempQueueRegion);
+        ss.stop(tempTopicRegion);
+        ss.throwFirstException();
     }
 
     public void addConnection(ConnectionContext context, ConnectionInfo info) throws Throwable {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=366220&r1=366219&r2=366220&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Thu Jan  5 09:09:05 2006
@@ -25,11 +25,14 @@
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.util.SubscriptionKey;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
@@ -42,7 +45,8 @@
  * @version $Revision: 1.12 $
  */
 public class TopicRegion extends AbstractRegion {
-
+    private static final Log log = LogFactory.getLog(TopicRegion.class);
+    
     protected final ConcurrentHashMap durableSubscriptions = new ConcurrentHashMap();
     private final PolicyMap policyMap;
 
@@ -78,7 +82,8 @@
                 }
                 else {
                     // Change the consumer id key of the durable sub.
-                    subscriptions.remove(sub.getConsumerInfo().getConsumerId());
+                    if( sub.getConsumerInfo().getConsumerId()!=null )
+                        subscriptions.remove(sub.getConsumerInfo().getConsumerId());
                     subscriptions.put(info.getConsumerId(), sub);
                     sub.activate(context, info);
                 }
@@ -136,6 +141,16 @@
         TopicMessageStore store = persistenceAdapter.createTopicMessageStore((ActiveMQTopic) destination);
         Topic topic = new Topic(destination, store, memoryManager, destinationStatistics, taskRunnerFactory);
         configureTopic(topic, destination);
+        
+        // Eagerly recover the durable subscriptions
+        if (store != null) {            
+            SubscriptionInfo[] infos = store.getAllSubscriptions();
+            for (int i = 0; i < infos.length; i++) {
+                log.info("Restoring durable subscription: "+infos[i]);
+                createDurableSubscription(infos[i]);
+            }            
+        }
+        
         return topic;
     }
 
@@ -165,6 +180,15 @@
             return new TopicSubscription(context, info, memoryManager);
         }
     }
+    
+    public Subscription createDurableSubscription(SubscriptionInfo info) throws JMSException {
+        SubscriptionKey key = new SubscriptionKey(info.getClientId(), info.getSubcriptionName());
+        DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
+        sub = new DurableTopicSubscription(info);
+        durableSubscriptions.put(key, sub);
+        return sub;
+    }
+    
 
     /**
      */

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java?rev=366220&r1=366219&r2=366220&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java Thu Jan  5 09:09:05 2006
@@ -16,10 +16,7 @@
  */
 package org.apache.activemq.command;
 
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.Arrays;
-import java.util.LinkedHashMap;
+import org.apache.activemq.util.IntrospectionSupport;
 
 /**
  * 
@@ -87,50 +84,7 @@
     }
 
     public String toString() {
-        LinkedHashMap map = new LinkedHashMap();
-        addFields(map, getClass());
-        return simpleName(getClass())+" "+map;
+        return IntrospectionSupport.toString(this, BaseCommand.class);
     }
-
-    public static String simpleName(Class clazz) {
-        String name = clazz.getName();
-        int p = name.lastIndexOf(".");
-        if( p >= 0 ) {
-            name = name.substring(p+1);
-        }
-        return name;
-    }
-    
-
-    private void addFields(LinkedHashMap map, Class clazz) {
-        
-        if( clazz!=BaseCommand.class ) 
-            addFields( map, clazz.getSuperclass() );
-        
-        Field[] fields = clazz.getDeclaredFields();
-        for (int i = 0; i < fields.length; i++) {
-            Field field = fields[i];
-            if( Modifier.isStatic(field.getModifiers()) || 
-                Modifier.isTransient(field.getModifiers()) ||
-                Modifier.isPrivate(field.getModifiers())  ) {
-                continue;
-            }
-            
-            try {
-                Object o = field.get(this);
-                if( o!=null && o.getClass().isArray() ) {
-                    try {
-                        o = Arrays.asList((Object[]) o);
-                    } catch (Throwable e) {
-                    }
-                }
-                map.put(field.getName(), o);
-            } catch (Throwable e) {
-                e.printStackTrace();
-            }
-        }
-        
-    }
-
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java?rev=366220&r1=366219&r2=366220&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java Thu Jan  5 09:09:05 2006
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.command;
 
+import org.apache.activemq.util.IntrospectionSupport;
+
 
 /**
  * 
@@ -82,4 +84,9 @@
     public boolean isMarshallAware() {
         return false;
     }
+    
+    public String toString() {
+        return IntrospectionSupport.toString(this);
+    }
+
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IntrospectionSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IntrospectionSupport.java?rev=366220&r1=366219&r2=366220&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IntrospectionSupport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IntrospectionSupport.java Thu Jan  5 09:09:05 2006
@@ -18,11 +18,15 @@
 
 import java.beans.PropertyEditor;
 import java.beans.PropertyEditorManager;
+import java.lang.reflect.Field;
 import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 
@@ -136,6 +140,57 @@
         if( clazz == URI.class )
             return true;
         return false;
+    }
+
+    static public String toString(Object target) {
+        return toString(target, Object.class);
+    }
+
+    static public String toString(Object target, Class stopClass) {
+        LinkedHashMap map = new LinkedHashMap();
+        addFields(target, target.getClass(), stopClass, map);
+        return simpleName(target.getClass())+" "+map;
+    }
+
+    static public String simpleName(Class clazz) {
+        String name = clazz.getName();
+        int p = name.lastIndexOf(".");
+        if( p >= 0 ) {
+            name = name.substring(p+1);
+        }
+        return name;
+    }
+    
+
+    static private void addFields(Object target, Class startClass, Class stopClass, LinkedHashMap map) {
+        
+        if( startClass!=stopClass ) 
+            addFields( target, startClass.getSuperclass(), stopClass, map );
+        
+        Field[] fields = startClass.getDeclaredFields();
+        for (int i = 0; i < fields.length; i++) {
+            Field field = fields[i];
+            if( Modifier.isStatic(field.getModifiers()) || 
+                Modifier.isTransient(field.getModifiers()) ||
+                Modifier.isPrivate(field.getModifiers())  ) {
+                continue;
+            }
+            
+            try {
+                field.setAccessible(true);
+                Object o = field.get(target);
+                if( o!=null && o.getClass().isArray() ) {
+                    try {
+                        o = Arrays.asList((Object[]) o);
+                    } catch (Throwable e) {
+                    }
+                }
+                map.put(field.getName(), o);
+            } catch (Throwable e) {
+                e.printStackTrace();
+            }
+        }
+        
     }
 
     

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java?rev=366220&r1=366219&r2=366220&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java Thu Jan  5 09:09:05 2006
@@ -18,6 +18,8 @@
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.test.JmsResourceProvider;
 import org.apache.activemq.test.TestSupport;
 
@@ -31,6 +33,9 @@
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.MessageListener;
+
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -57,6 +62,8 @@
     private List ackMessages = new ArrayList(messageCount);
     private boolean resendPhase = false;
 
+    private BrokerService broker;
+
     public JmsTransactionTestSupport() {
         super();
     }
@@ -70,7 +77,9 @@
      * @see junit.framework.TestCase#setUp()
      */
     protected void setUp() throws Exception {
-        super.setUp();
+        broker = createBroker();
+        broker.start();
+        
         resourceProvider = getJmsResourceProvider();
         topic = resourceProvider.isTopic();
         // We will be using transacted sessions.
@@ -79,17 +88,21 @@
         reconnect();
     }
 
+    /**
+     */
+    protected BrokerService createBroker() throws Exception, URISyntaxException {
+        return BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false"));
+    }
+
     /* (non-Javadoc)
      * @see junit.framework.TestCase#tearDown()
      */
     protected void tearDown() throws Exception {
-        //TODO
-        //log.info("Test Done.  Stats");
-        //((ActiveMQConnectionFactory) connectionFactory).getFactoryStats().dump(new IndentPrinter());
         log.info("Closing down connection");
 
         session.close();
         connection.close();
+        broker.stop();
         log.info("Connection closed.");
     }