You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2008/08/22 18:15:06 UTC

svn commit: r688110 - in /incubator/qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/protocol/ systests/ systests/src/main/java/org/apache/qpid/server/store/ systests/src/main/java/org/apache/qpid/test/client/timeouts/ systests/src/mai...

Author: ritchiem
Date: Fri Aug 22 09:15:02 2008
New Revision: 688110

URL: http://svn.apache.org/viewvc?rev=688110&view=rev
Log:
QPID-1119 : M2x commit : Addition of a System property to AMQProtocolHandler.java to allow the syncWait default to be changed. To perform this a new SlowMessageStore has been added to the systest package. This allows all MessageStore methods to have a pre and/or post delay applied. This delay can be configured dynamically if you have a handle to the Store or via the XML configuration. The SlowMessageStore can also be used to wrap any existing MessageStore (Testing only carried out with the default MemoryMessageStore)
    To make testing easier on M2x VMTestCase has been modified to allow the test to simply configure logging levels and systemProperties. These are then reverted after the test has completed.

    These changes will naturally need more work to before they are merged to trunk which uses totally different methods for ClientProperties and for running tests.

    systests/pom.xml didn't have amqj.logging.level as a systemProperty hence setting it did nothing for the tests.

Added:
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java
Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    incubator/qpid/trunk/qpid/java/systests/pom.xml
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=688110&r1=688109&r2=688110&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Fri Aug 22 09:15:02 2008
@@ -162,7 +162,7 @@
     private FailoverException _lastFailoverException;
 
     /** Defines the default timeout to use for synchronous protocol commands. */
-    private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30;
+    private final long DEFAULT_SYNC_TIMEOUT = Long.getLong("amqj.default_syncwrite_timeout", 1000 * 30);
 
     /**
      * Creates a new protocol handler, associated with the specified client connection instance.

Modified: incubator/qpid/trunk/qpid/java/systests/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/pom.xml?rev=688110&r1=688109&r2=688110&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/pom.xml (original)
+++ incubator/qpid/trunk/qpid/java/systests/pom.xml Fri Aug 22 09:15:02 2008
@@ -96,6 +96,10 @@
                             <name>QPID_HOME</name>
                             <value>${basedir}/${topDirectoryLocation}/broker</value>
                         </property>                        
+                        <property>
+                            <name>amqj.logging.level</name>
+                            <value>${amqj.logging.level}</value>
+                        </property>
                     </systemProperties>
 
                     <excludes>

Added: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java?rev=688110&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java (added)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java Fri Aug 22 09:15:02 2008
@@ -0,0 +1,282 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MessageMetaData;
+
+import java.util.HashMap;
+import java.util.Iterator;
+
+public class SlowMessageStore implements MessageStore
+{
+    private static final Logger _logger = Logger.getLogger(SlowMessageStore.class);
+    private static final String DELAYS = "delays";
+    private HashMap<String, Long> _preDelays = new HashMap<String, Long>();
+    private HashMap<String, Long> _postDelays = new HashMap<String, Long>();
+    private long _defaultDelay = 0L;
+    private MessageStore _realStore = new MemoryMessageStore();
+    private static final String PRE = "pre";
+    private static final String POST = "post";
+    private String DEFAULT_DELAY = "default";
+
+    public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception
+    {
+        Configuration delays = config.subset(base + "." + DELAYS);
+
+        configureDelays(delays);
+
+        String messageStoreClass = config.getString(base + ".store.class");
+
+        if (delays.containsKey(DEFAULT_DELAY))
+        {
+            _defaultDelay = delays.getLong(DEFAULT_DELAY);
+        }
+
+        if (messageStoreClass != null)
+        {
+            Class clazz = Class.forName(messageStoreClass);
+
+            Object o = clazz.newInstance();
+
+            if (!(o instanceof MessageStore))
+            {
+                throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
+                                             " does not.");
+            }
+            _realStore = (MessageStore) o;
+            _realStore.configure(virtualHost, base + ".store", config);
+        }
+        else
+        {
+            _realStore.configure(virtualHost, base + ".store", config);
+        }
+    }
+
+    private void configureDelays(Configuration config)
+    {
+        Iterator delays = config.getKeys();
+
+        while (delays.hasNext())
+        {
+            String key = (String) delays.next();
+            if (key.endsWith(PRE))
+            {
+                _preDelays.put(key.substring(0, key.length() - PRE.length() - 1), config.getLong(key));
+            }
+            else if (key.endsWith(POST))
+            {
+                _postDelays.put(key.substring(0, key.length() - POST.length() - 1), config.getLong(key));
+            }
+        }
+    }
+
+    private void doPostDelay(String method)
+    {
+        long delay = lookupDelay(_postDelays, method);
+        doDelay(delay);
+    }
+
+    private void doPreDelay(String method)
+    {
+        long delay = lookupDelay(_preDelays, method);
+        doDelay(delay);
+    }
+
+    private long lookupDelay(HashMap<String, Long> delays, String method)
+    {
+        Long delay = delays.get(method);
+        return (delay == null) ? _defaultDelay : delay;
+    }
+
+    private void doDelay(long delay)
+    {
+        if (delay > 0)
+        {
+            try
+            {
+                Thread.sleep(delay);
+            }
+            catch (InterruptedException e)
+            {
+                _logger.warn("Interrupted : " + e);
+            }
+        }
+    }
+
+    // ***** MessageStore Interface.
+
+    public void close() throws Exception
+    {
+        doPreDelay("close");
+        _realStore.close();
+        doPostDelay("close");
+    }
+
+    public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
+    {
+        doPreDelay("removeMessage");
+        _realStore.removeMessage(storeContext, messageId);
+        doPostDelay("removeMessage");
+    }
+
+    public void createExchange(Exchange exchange) throws AMQException
+    {
+        doPreDelay("createExchange");
+        _realStore.createExchange(exchange);
+        doPostDelay("createExchange");
+    }
+
+    public void removeExchange(Exchange exchange) throws AMQException
+    {
+        doPreDelay("removeExchange");
+        _realStore.removeExchange(exchange);
+        doPostDelay("removeExchange");
+    }
+
+    public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+    {
+        doPreDelay("bindQueue");
+        _realStore.bindQueue(exchange, routingKey, queue, args);
+        doPostDelay("bindQueue");
+    }
+
+    public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+    {
+        doPreDelay("unbindQueue");
+        _realStore.unbindQueue(exchange, routingKey, queue, args);
+        doPostDelay("unbindQueue");
+    }
+
+    public void createQueue(AMQQueue queue) throws AMQException
+    {
+        createQueue(queue, null);
+    }
+
+    public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
+    {
+        doPreDelay("createQueue");
+        _realStore.createQueue(queue, arguments);
+        doPostDelay("createQueue");
+    }
+
+    public void removeQueue(AMQQueue queue) throws AMQException
+    {
+        doPreDelay("removeQueue");
+        _realStore.removeQueue(queue);
+        doPostDelay("removeQueue");
+    }
+
+    public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
+    {
+        doPreDelay("enqueueMessage");
+        _realStore.enqueueMessage(context, queue, messageId);
+        doPostDelay("enqueueMessage");
+    }
+
+    public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
+    {
+        doPreDelay("dequeueMessage");
+        _realStore.dequeueMessage(context, queue, messageId);
+        doPostDelay("dequeueMessage");
+    }
+
+    public void beginTran(StoreContext context) throws AMQException
+    {
+        doPreDelay("beginTran");
+        _realStore.beginTran(context);
+        doPostDelay("beginTran");
+    }
+
+    public void commitTran(StoreContext context) throws AMQException
+    {
+        doPreDelay("commitTran");
+        _realStore.commitTran(context);
+        doPostDelay("commitTran");
+    }
+
+    public void abortTran(StoreContext context) throws AMQException
+    {
+        doPreDelay("abortTran");
+        _realStore.abortTran(context);
+        doPostDelay("abortTran");
+    }
+
+    public boolean inTran(StoreContext context)
+    {
+        doPreDelay("inTran");
+        boolean b = _realStore.inTran(context);
+        doPostDelay("inTran");
+        return b;
+    }
+
+    public Long getNewMessageId()
+    {
+        doPreDelay("getNewMessageId");
+        Long l = _realStore.getNewMessageId();
+        doPostDelay("getNewMessageId");
+        return l;
+    }
+
+    public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
+    {
+        doPreDelay("storeContentBodyChunk");
+        _realStore.storeContentBodyChunk(context, messageId, index, contentBody, lastContentBody);
+        doPostDelay("storeContentBodyChunk");
+    }
+
+    public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException
+    {
+        doPreDelay("storeMessageMetaData");
+        _realStore.storeMessageMetaData(context, messageId, messageMetaData);
+        doPostDelay("storeMessageMetaData");
+    }
+
+    public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
+    {
+        doPreDelay("getMessageMetaData");
+        MessageMetaData mmd = _realStore.getMessageMetaData(context, messageId);
+        doPostDelay("getMessageMetaData");
+        return mmd;
+    }
+
+    public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
+    {
+        doPreDelay("getContentBodyChunk");
+        ContentChunk c = _realStore.getContentBodyChunk(context, messageId, index);
+        doPostDelay("getContentBodyChunk");
+        return c;
+    }
+
+    public boolean isPersistent()
+    {
+        return _realStore.isPersistent();
+    }
+
+}

Added: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java?rev=688110&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java (added)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java Fri Aug 22 09:15:02 2008
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.client.timeouts;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.io.File;
+
+/**
+ * This tests that when the commit takes a long time(due to POST_COMMIT_DELAY) that the commit does not timeout
+ * This test must be run in conjunction with SyncWaiteTimeoutDelay or be run with POST_COMMIT_DELAY > 30s to ensure
+ * that the default value is being replaced.
+ */
+public class SyncWaitDelayTest extends QpidTestCase
+{
+    protected static final Logger _logger = LoggerFactory.getLogger(SyncWaitDelayTest.class);
+
+    final String QpidHome = System.getProperty("QPID_HOME");
+    final File _configFile = new File(QpidHome, "etc/config.xml");
+
+    private String VIRTUALHOST = "test";
+    protected long POST_COMMIT_DELAY = 1000L;
+    protected long SYNC_WRITE_TIMEOUT = POST_COMMIT_DELAY + 1000;
+
+    protected Connection _connection;
+    protected Session _session;
+    protected Queue _queue;
+    protected MessageConsumer _consumer;
+
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        stopBroker();
+        if (!_configFile.exists())
+        {
+            fail("Unable to test without config file:" + _configFile);
+        }
+
+        ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(_configFile);
+
+        //Disable management on broker.
+        config.getConfiguration().setProperty("management.enabled", "false");
+
+        Configuration testVirtualhost = config.getConfiguration().subset("virtualhosts.virtualhost." + VIRTUALHOST);
+        testVirtualhost.setProperty("store.class", "org.apache.qpid.server.store.SlowMessageStore");
+        testVirtualhost.setProperty("store.delays.commitTran.post", POST_COMMIT_DELAY);
+
+        startBroker(1, config);
+
+        //Set the syncWrite timeout to be just larger than the delay on the commitTran.
+        setSystemProperty("amqj.default_syncwrite_timeout", String.valueOf(SYNC_WRITE_TIMEOUT));
+
+        _connection = getConnection();
+
+        //Create Queue        
+        _queue = (Queue) getInitialContext().lookup("queue");
+
+        //Create Consumer
+        _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+
+        //Ensure Queue exists
+        _session.createConsumer(_queue).close();
+    }
+
+
+    public void test() throws JMSException
+    {
+        MessageProducer producer = _session.createProducer(_queue);
+
+        Message message = _session.createTextMessage("Message");
+
+        producer.send(message);
+
+        long start = System.nanoTime();
+
+        _logger.info("Calling Commit");
+
+        try
+        {
+            _session.commit();
+            long end = System.nanoTime();
+            long time = (end - start);
+            // As we are using Nano time ensure to multiply up the millis.
+            assertTrue("Commit was quickier than the delay:" + time, time > 1000000L * POST_COMMIT_DELAY);
+            assertFalse("Commit was to slower than the build in default", time > 1000000L * 1000 * 30);
+        }
+        catch (JMSException e)
+        {
+            fail(e.getMessage());
+        }
+
+    }
+
+}

Added: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java?rev=688110&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java (added)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java Fri Aug 22 09:15:02 2008
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.client.timeouts;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQTimeoutException;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+
+/** This tests that when the syncWrite timeout is set that it will timeout on that time rather than the default 30s. */
+public class SyncWaitTimeoutDelayTest extends SyncWaitDelayTest
+{
+    protected static final Logger _logger = Logger.getLogger(SyncWaitTimeoutDelayTest.class);
+
+    public void setUp() throws Exception
+    {
+        POST_COMMIT_DELAY = 1000L;
+
+        //Set the syncWrite timeout to be less than the COMMIT Delay so we can validate that it is being applied
+        SYNC_WRITE_TIMEOUT = 500L;
+
+        super.setUp();
+    }
+
+    public void test() throws JMSException
+    {
+        MessageProducer producer = _session.createProducer(_queue);
+
+        Message message = _session.createTextMessage("Message");
+
+        producer.send(message);
+
+        _logger.info("Calling Commit");
+
+        long start = System.nanoTime();
+        try
+        {
+            _session.commit();
+            fail("Commit occured even though syncWait timeout is shorter than delay in commit");
+        }
+        catch (JMSException e)
+        {
+            assertTrue("Wrong exception type received.", e.getLinkedException() instanceof AMQTimeoutException);
+            assertTrue("Wrong message received on exception.", e.getMessage().startsWith("Failed to commit"));
+            // As we are using Nano time ensure to multiply up the millis.            
+            assertTrue("Timeout was more than 30s default", (System.nanoTime() - start) < (1000000L * 1000 * 30));
+        }
+
+    }
+}

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java?rev=688110&r1=688109&r2=688110&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java Fri Aug 22 09:15:02 2008
@@ -28,6 +28,7 @@
 import java.util.List;
 import java.util.StringTokenizer;
 import java.util.Map;
+import java.util.HashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -35,6 +36,7 @@
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQConnectionFactory;
 import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,6 +52,8 @@
 
     protected long RECEIVE_TIMEOUT = 1000l;
 
+    private Map<String, String> _setProperties = new HashMap<String, String>();
+
     /**
      * Some tests are excluded when the property test.excludes is set to true.
      * An exclusion list is either a file (prop test.excludesfile) which contains one test name
@@ -131,6 +135,8 @@
 
     private static final String QPID_HOME = "QPID_HOME";
 
+    protected int DEFAULT_VM_PORT = 1;
+
     protected String _broker = System.getProperty(BROKER, VM);
     private String _brokerClean = System.getProperty(BROKER_CLEAN, null);
     private String _brokerVersion = System.getProperty(BROKER_VERSION, VERSION_08);
@@ -282,12 +288,26 @@
         }
     }
 
+    public void startBroker(int port, ConfigurationFileApplicationRegistry config) throws Exception
+    {
+        ApplicationRegistry.initialise(config, port);
+        startBroker(port);
+    }
+
     public void startBroker() throws Exception
     {
+        startBroker(0);
+    }
+
+    public void startBroker(int port) throws Exception
+    {
         if (_broker.equals(VM))
         {
+            //If we are starting on port 0 use the default VM_PORT
+            port = port == 0 ? DEFAULT_VM_PORT : port;
+
             // create an in_VM broker
-            TransportConnection.createVMBroker(1);
+            TransportConnection.createVMBroker(port);
         }
         else if (!_broker.equals(EXTERNAL))
         {
@@ -362,6 +382,11 @@
 
     public void stopBroker() throws Exception
     {
+        stopBroker(0);
+    }
+
+    public void stopBroker(int port) throws Exception
+    {
         _logger.info("stopping broker: " + _broker);
         if (_brokerProcess != null)
         {
@@ -372,12 +397,40 @@
         }
         else if (_broker.equals(VM))
         {
-            TransportConnection.killAllVMBrokers();
-            ApplicationRegistry.removeAll();
+            port = port == 0 ? DEFAULT_VM_PORT : port;
+
+            TransportConnection.killVMBroker(port);
+            ApplicationRegistry.remove(port);
         }
         _brokerStarted = false;
     }
 
+    protected void setSystemProperty(String property, String value)
+    {
+        if (!_setProperties.containsKey(property))
+        {
+            _setProperties.put(property, System.getProperty(property));
+        }
+
+        System.setProperty(property, value);
+    }
+
+    protected void revertSystemProperties()
+    {
+        for (String key : _setProperties.keySet())
+        {
+            String value = _setProperties.get(key);
+            if (value != null)
+            {
+                System.setProperty(key, value);
+            }
+            else
+            {
+                System.clearProperty(key);
+            }
+        }
+    }
+
     /**
      * Check whether the broker is an 0.8
      *
@@ -395,8 +448,13 @@
 
     public void restartBroker() throws Exception
     {
-        stopBroker();
-        startBroker();
+        restartBroker(0);
+    }
+
+    public void restartBroker(int port) throws Exception
+    {
+        stopBroker(port);
+        startBroker(port);
     }
 
     /**
@@ -508,6 +566,8 @@
                 c.close();
             }
         }
+
+        revertSystemProperties();
     }
 
 }