You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/07/01 17:53:17 UTC

svn commit: r790233 [2/3] - in /activemq/sandbox/activemq-flow: activemq-all/ activemq-all/src/test/java/org/apache/activemq/legacy/transport/ activemq-all/src/test/java/org/apache/activemq/legacy/transport/nio/ activemq-all/src/test/java/org/apache/ac...

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/openwire/BrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/openwire/BrokerTest.java?rev=790233&r1=790232&r2=790233&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/openwire/BrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/openwire/BrokerTest.java Wed Jul  1 15:53:16 2009
@@ -16,14 +16,20 @@
  */
 package org.apache.activemq.openwire;
 
+
+import static org.testng.Assert.assertEquals;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import javax.jms.DeliveryMode;
 
-import junit.framework.Test;
-
+import org.apache.activemq.Service;
+import org.apache.activemq.apollo.Combinator;
+import org.apache.activemq.apollo.Combinator.BeanFactory;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -33,275 +39,334 @@
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.legacy.openwireprotocol.BrokerTestSupport;
 import org.apache.activemq.legacy.openwireprotocol.StubConnection;
+import org.testng.Assert;
+import org.testng.IHookCallBack;
+import org.testng.IHookable;
+import org.testng.ITestResult;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class BrokerTest implements BeanFactory, IHookable{
+
+	public Combinator combinator() {
+		return new Combinator();
+	}
+
+	public Object createBean() throws Exception {
+		return new BrokerTestScenario();
+	}
+	
+	public void run(IHookCallBack callback, ITestResult result) {
+		Object [] params = new Object[]{};
+		try {
+			Field field = callback.getClass().getDeclaredField("val$parameters");
+			field.setAccessible(true);
+			params = (Object[]) field.get(callback);
+			for (int i = 0; i < params.length; i++) {
+				if( params[i] instanceof Service) {
+					((Service)params[i]).start();
+				}
+			}
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+		try {
+			callback.runTestMethod(result);
+		} finally {
+			try {
+				for (int i = 0; i < params.length; i++) {
+				if( params[i] instanceof Service) {
+						((Service)params[i]).stop();
+					}
+				}
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+		}
+	}
+	
+	@DataProvider(name = "default")
+	public Object[][] createData0(Method method) throws Exception {
+		return combinator()
+			.put("deliveryMode", 0)
+			.combinationsAsParameterArgBeans(this);
+	}
+
+	@DataProvider(name = "deliveryMode-combinations")
+	public Object[][] createData1(Method method) throws Exception {
+		return combinator()
+			.put("deliveryMode", DeliveryMode.PERSISTENT, DeliveryMode.NON_PERSISTENT)
+			.combinationsAsParameterArgBeans(this);
+	}
+	
+	@DataProvider(name = "deliveryMode-queue-combinations")
+	public Object[][] createData2(Method method) throws Exception {
+		return combinator()
+			.put("deliveryMode", DeliveryMode.PERSISTENT, DeliveryMode.NON_PERSISTENT)
+			.put("destinationType", ActiveMQDestination.QUEUE_TYPE, ActiveMQDestination.TEMP_QUEUE_TYPE)
+			.combinationsAsParameterArgBeans(this);
+	}
+	
+	@DataProvider(name = "deliveryMode-perm-destinations-combinations")
+	public Object[][] createData3(Method method) throws Exception {
+		return combinator()
+			.put("deliveryMode", DeliveryMode.PERSISTENT, DeliveryMode.NON_PERSISTENT)
+			.put("destinationType", ActiveMQDestination.QUEUE_TYPE, ActiveMQDestination.TOPIC_TYPE)
+			.combinationsAsParameterArgBeans(this);
+	}	
+
+	@DataProvider(name = "deliveryMode-all-destinations-combinations")
+	public Object[][] createData4(Method method) throws Exception {
+		return combinator()
+			.put("deliveryMode", DeliveryMode.PERSISTENT, DeliveryMode.NON_PERSISTENT)
+			.put("destinationType", ActiveMQDestination.QUEUE_TYPE, ActiveMQDestination.TOPIC_TYPE, 
+									ActiveMQDestination.TEMP_QUEUE_TYPE, ActiveMQDestination.TEMP_TOPIC_TYPE)
+			.combinationsAsParameterArgBeans(this);
+	}
+	
+	@DataProvider(name = "deliveryMode-durableConsumer-combinations")
+	public Object[][] createData5(Method method) throws Exception {
+		return combinator()
+			.put("deliveryMode", DeliveryMode.PERSISTENT, DeliveryMode.NON_PERSISTENT)
+			.put("durableConsumer", true, false)
+			.combinationsAsParameterArgBeans(this);
+	}	
+	
 
-public class BrokerTest extends BrokerTestSupport {
-
-    public ActiveMQDestination destination;
-    public int deliveryMode;
-    public int prefetch;
-    public byte destinationType;
-    public boolean durableConsumer;
-    protected static final int MAX_NULL_WAIT=500;
-    
-    public void initCombosForTestTopicNoLocal() {
-        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
-                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
-    }
-
-    public void testTopicNoLocal() throws Exception {
+	@Test(dataProvider = "deliveryMode-combinations")
+    public void testTopicNoLocal(BrokerTestScenario scenario) throws Exception {
 
         ActiveMQDestination destination = new ActiveMQTopic("TEST");
 
         // Setup a first connection
-        StubConnection connection1 = createConnection();
-        ConnectionInfo connectionInfo1 = createConnectionInfo();
-        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
-        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+        StubConnection connection1 = scenario.createConnection();
+        ConnectionInfo connectionInfo1 = scenario.createConnectionInfo();
+        SessionInfo sessionInfo1 = scenario.createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo1 = scenario.createProducerInfo(sessionInfo1);
         connection1.send(connectionInfo1);
         connection1.send(sessionInfo1);
         connection1.send(producerInfo1);
 
-        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+        ConsumerInfo consumerInfo1 = scenario.createConsumerInfo(sessionInfo1, destination);
         consumerInfo1.setRetroactive(true);
         consumerInfo1.setPrefetchSize(100);
         consumerInfo1.setNoLocal(true);
         connection1.send(consumerInfo1);
 
         // Setup a second connection
-        StubConnection connection2 = createConnection();
-        ConnectionInfo connectionInfo2 = createConnectionInfo();
-        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
-        ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
+        StubConnection connection2 = scenario.createConnection();
+        ConnectionInfo connectionInfo2 = scenario.createConnectionInfo();
+        SessionInfo sessionInfo2 = scenario.createSessionInfo(connectionInfo2);
+        ProducerInfo producerInfo2 = scenario.createProducerInfo(sessionInfo2);
         connection2.send(connectionInfo2);
         connection2.send(sessionInfo2);
         connection2.send(producerInfo2);
 
-        ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
+        ConsumerInfo consumerInfo2 = scenario.createConsumerInfo(sessionInfo2, destination);
         consumerInfo2.setRetroactive(true);
         consumerInfo2.setPrefetchSize(100);
         consumerInfo2.setNoLocal(true);
         connection2.request(consumerInfo2);
 
         // Send the messages
-        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
-        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
-        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
-        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
+        connection1.send(scenario.createMessage(producerInfo1, destination, scenario.deliveryMode));
+        connection1.send(scenario.createMessage(producerInfo1, destination, scenario.deliveryMode));
+        connection1.send(scenario.createMessage(producerInfo1, destination, scenario.deliveryMode));
+        connection1.send(scenario.createMessage(producerInfo1, destination, scenario.deliveryMode));
 
         // The 2nd connection should get the messages.
         for (int i = 0; i < 4; i++) {
-            Message m1 = receiveMessage(connection2);
-            assertNotNull("Message: "+i, m1);
+            Message m1 = scenario.receiveMessage(connection2);
+            Assert.assertNotNull(m1, "Message: "+i);
         }
 
         // Send a message with the 2nd connection
-        Message message = createMessage(producerInfo2, destination, deliveryMode);
+        Message message = scenario.createMessage(producerInfo2, destination, scenario.deliveryMode);
         connection2.send(message);
 
         // The first connection should not see the initial 4 local messages sent
         // but should
         // see the messages from connection 2.
-        Message m = receiveMessage(connection1);
-        assertNotNull(m);
+        Message m = scenario.receiveMessage(connection1);
+        Assert.assertNotNull(m);
         assertEquals(message.getMessageId(), m.getMessageId());
 
-        assertNoMessagesLeft(connection1);
-        assertNoMessagesLeft(connection2);
-    }
-    
-    public void initCombosForTestQueueSendThenAddConsumer() {
-        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
-                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("destinationType",
-                             new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
-                                           Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE)});
+        scenario.assertNoMessagesLeft(connection1);
+        scenario.assertNoMessagesLeft(connection2);
     }
 
-    public void testQueueSendThenAddConsumer() throws Exception {
+	@Test(dataProvider = "deliveryMode-queue-combinations")
+    public void testQueueSendThenAddConsumer(BrokerTestScenario scenario) throws Exception {
 
         // Start a producer
-        StubConnection connection = createConnection();
-        ConnectionInfo connectionInfo = createConnectionInfo();
-        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
-        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        StubConnection connection = scenario.createConnection();
+        ConnectionInfo connectionInfo = scenario.createConnectionInfo();
+        SessionInfo sessionInfo = scenario.createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = scenario.createProducerInfo(sessionInfo);
         connection.send(connectionInfo);
         connection.send(sessionInfo);
         connection.send(producerInfo);
 
-        destination = createDestinationInfo(connection, connectionInfo, destinationType);
+        ActiveMQDestination destination = scenario.createDestinationInfo(connection, connectionInfo, scenario.destinationType);
 
         // Send a message to the broker.
-        connection.send(createMessage(producerInfo, destination, deliveryMode));
+        connection.send(scenario.createMessage(producerInfo, destination, scenario.deliveryMode));
 
         // Start the consumer
-        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+        ConsumerInfo consumerInfo = scenario.createConsumerInfo(sessionInfo, destination);
         connection.send(consumerInfo);
 
         // Make sure the message was delivered.
-        Message m = receiveMessage(connection);
-        assertNotNull(m);
+        Message m = scenario.receiveMessage(connection);
+        Assert.assertNotNull(m);
 
     }
-    public void initCombosForTestCompositeSend() {
-        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
-                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
-                                                              Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});
-    }
 
-    public void testCompositeSend() throws Exception {
+	@Test(dataProvider = "deliveryMode-perm-destinations-combinations")
+    public void testCompositeSend(BrokerTestScenario scenario) throws Exception {
 
         // Setup a first connection
-        StubConnection connection1 = createConnection();
-        ConnectionInfo connectionInfo1 = createConnectionInfo();
-        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
-        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+        StubConnection connection1 = scenario.createConnection();
+        ConnectionInfo connectionInfo1 = scenario.createConnectionInfo();
+        SessionInfo sessionInfo1 = scenario.createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo1 = scenario.createProducerInfo(sessionInfo1);
         connection1.send(connectionInfo1);
         connection1.send(sessionInfo1);
         connection1.send(producerInfo1);
 
-        ActiveMQDestination destinationA = ActiveMQDestination.createDestination("A", destinationType);
-        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destinationA);
+        ActiveMQDestination destinationA = ActiveMQDestination.createDestination("A", scenario.destinationType);
+        ConsumerInfo consumerInfo1 = scenario.createConsumerInfo(sessionInfo1, destinationA);
         consumerInfo1.setRetroactive(true);
         consumerInfo1.setPrefetchSize(100);
         connection1.request(consumerInfo1);
 
         // Setup a second connection
-        StubConnection connection2 = createConnection();
-        ConnectionInfo connectionInfo2 = createConnectionInfo();
-        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+        StubConnection connection2 = scenario.createConnection();
+        ConnectionInfo connectionInfo2 = scenario.createConnectionInfo();
+        SessionInfo sessionInfo2 = scenario.createSessionInfo(connectionInfo2);
         connection2.send(connectionInfo2);
         connection2.send(sessionInfo2);
 
-        ActiveMQDestination destinationB = ActiveMQDestination.createDestination("B", destinationType);
-        ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destinationB);
+        ActiveMQDestination destinationB = ActiveMQDestination.createDestination("B", scenario.destinationType);
+        ConsumerInfo consumerInfo2 = scenario.createConsumerInfo(sessionInfo2, destinationB);
         consumerInfo2.setRetroactive(true);
         consumerInfo2.setPrefetchSize(100);
         connection2.request(consumerInfo2);
 
-        // Send the messages to the composite destination.
+        // Send the messages to the composite scenario.destination.
         ActiveMQDestination compositeDestination = ActiveMQDestination.createDestination("A,B",
-                                                                                         destinationType);
+                                                                                         scenario.destinationType);
         for (int i = 0; i < 4; i++) {
-            connection1.request(createMessage(producerInfo1, compositeDestination, deliveryMode));
+            connection1.request(scenario.createMessage(producerInfo1, compositeDestination, scenario.deliveryMode));
         }
 
         // The messages should have been delivered to both the A and B
-        // destination.
+        // scenario.destination.
         for (int i = 0; i < 4; i++) {
-            Message m1 = receiveMessage(connection1);
-            Message m2 = receiveMessage(connection2);
+            Message m1 = scenario.receiveMessage(connection1);
+            Message m2 = scenario.receiveMessage(connection2);
 
-            assertNotNull(m1);
-            assertNotNull(m2);
+            Assert.assertNotNull(m1);
+            Assert.assertNotNull(m2);
 
             assertEquals(m1.getMessageId(), m2.getMessageId());
             assertEquals(compositeDestination, m1.getOriginalDestination());
             assertEquals(compositeDestination, m2.getOriginalDestination());
 
-            connection1.request(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
-            connection2.request(createAck(consumerInfo2, m2, 1, MessageAck.STANDARD_ACK_TYPE));
+            connection1.request(scenario.createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
+            connection2.request(scenario.createAck(consumerInfo2, m2, 1, MessageAck.STANDARD_ACK_TYPE));
 
         }
 
-        assertNoMessagesLeft(connection1);
-        assertNoMessagesLeft(connection2);
-
-        connection1.send(closeConnectionInfo(connectionInfo1));
-        connection2.send(closeConnectionInfo(connectionInfo2));
-    }
+        scenario.assertNoMessagesLeft(connection1);
+        scenario.assertNoMessagesLeft(connection2);
 
-    public void initCombosForTestQueueOnlyOnceDeliveryWith2Consumers() {
-        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
-                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
+        connection1.send(scenario.closeConnectionInfo(connectionInfo1));
+        connection2.send(scenario.closeConnectionInfo(connectionInfo2));
     }
 
-    public void testQueueOnlyOnceDeliveryWith2Consumers() throws Exception {
+	@Test(dataProvider = "deliveryMode-combinations")
+    public void testQueueOnlyOnceDeliveryWith2Consumers(BrokerTestScenario scenario) throws Exception {
 
         ActiveMQDestination destination = new ActiveMQQueue("TEST");
 
         // Setup a first connection
-        StubConnection connection1 = createConnection();
-        ConnectionInfo connectionInfo1 = createConnectionInfo();
-        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
-        ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
+        StubConnection connection1 = scenario.createConnection();
+        ConnectionInfo connectionInfo1 = scenario.createConnectionInfo();
+        SessionInfo sessionInfo1 = scenario.createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo = scenario.createProducerInfo(sessionInfo1);
         connection1.send(connectionInfo1);
         connection1.send(sessionInfo1);
         connection1.send(producerInfo);
 
-        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+        ConsumerInfo consumerInfo1 = scenario.createConsumerInfo(sessionInfo1, destination);
         consumerInfo1.setPrefetchSize(1);
         connection1.request(consumerInfo1);
 
         // Setup a second connection
-        StubConnection connection2 = createConnection();
-        ConnectionInfo connectionInfo2 = createConnectionInfo();
-        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
-        ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
+        StubConnection connection2 = scenario.createConnection();
+        ConnectionInfo connectionInfo2 = scenario.createConnectionInfo();
+        SessionInfo sessionInfo2 = scenario.createSessionInfo(connectionInfo2);
+        ConsumerInfo consumerInfo2 = scenario.createConsumerInfo(sessionInfo2, destination);
         consumerInfo2.setPrefetchSize(1);
         connection2.send(connectionInfo2);
         connection2.send(sessionInfo2);
         connection2.request(consumerInfo2);
 
         // Send the messages
-        connection1.send(createMessage(producerInfo, destination, deliveryMode));
-        connection1.send(createMessage(producerInfo, destination, deliveryMode));
-        connection1.send(createMessage(producerInfo, destination, deliveryMode));
-        connection1.request(createMessage(producerInfo, destination, deliveryMode));
+        connection1.send(scenario.createMessage(producerInfo, destination, scenario.deliveryMode));
+        connection1.send(scenario.createMessage(producerInfo, destination, scenario.deliveryMode));
+        connection1.send(scenario.createMessage(producerInfo, destination, scenario.deliveryMode));
+        connection1.request(scenario.createMessage(producerInfo, destination, scenario.deliveryMode));
 
         for (int i = 0; i < 2; i++) {
-            Message m1 = receiveMessage(connection1);
-            Message m2 = receiveMessage(connection2);
+            Message m1 = scenario.receiveMessage(connection1);
+            Message m2 = scenario.receiveMessage(connection2);
 
-            assertNotNull("m1 is null for index: " + i, m1);
-            assertNotNull("m2 is null for index: " + i, m2);
+            Assert.assertNotNull(m1, ("m1 is null for index: " + i));
+            Assert.assertNotNull(m2, ("m2 is null for index: " + i));
 
-            assertNotSame(m1.getMessageId(), m2.getMessageId());
-            connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
-            connection2.send(createAck(consumerInfo2, m2, 1, MessageAck.STANDARD_ACK_TYPE));
+            Assert.assertNotSame(m2.getMessageId(), m1.getMessageId());
+            connection1.send(scenario.createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
+            connection2.send(scenario.createAck(consumerInfo2, m2, 1, MessageAck.STANDARD_ACK_TYPE));
         }
 
-        assertNoMessagesLeft(connection1);
-        assertNoMessagesLeft(connection2);
+        scenario.assertNoMessagesLeft(connection1);
+        scenario.assertNoMessagesLeft(connection2);
     }
 
-    public void initCombosForTestQueueBrowserWith2Consumers() {
-        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
-                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
-    }
-
-    public void testQueueBrowserWith2Consumers() throws Exception {
+	@Test(dataProvider = "deliveryMode-combinations")
+    public void testQueueBrowserWith2Consumers(BrokerTestScenario scenario) throws Exception {
 
         ActiveMQDestination destination = new ActiveMQQueue("TEST");
 
         // Setup a first connection
-        StubConnection connection1 = createConnection();
-        ConnectionInfo connectionInfo1 = createConnectionInfo();
-        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
-        ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
+        StubConnection connection1 = scenario.createConnection();
+        ConnectionInfo connectionInfo1 = scenario.createConnectionInfo();
+        SessionInfo sessionInfo1 = scenario.createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo = scenario.createProducerInfo(sessionInfo1);
         connection1.send(connectionInfo1);
         connection1.send(sessionInfo1);
         connection1.send(producerInfo);
 
-        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+        ConsumerInfo consumerInfo1 = scenario.createConsumerInfo(sessionInfo1, destination);
         consumerInfo1.setPrefetchSize(10);
         connection1.request(consumerInfo1);
 
         // Send the messages
-        connection1.send(createMessage(producerInfo, destination, deliveryMode));
-        connection1.send(createMessage(producerInfo, destination, deliveryMode));
-        connection1.send(createMessage(producerInfo, destination, deliveryMode));
+        connection1.send(scenario.createMessage(producerInfo, destination, scenario.deliveryMode));
+        connection1.send(scenario.createMessage(producerInfo, destination, scenario.deliveryMode));
+        connection1.send(scenario.createMessage(producerInfo, destination, scenario.deliveryMode));
         //as the messages are sent async - need to synchronize the last
         //one to ensure they arrive in the order we want
-        connection1.request(createMessage(producerInfo, destination, deliveryMode));
+        connection1.request(scenario.createMessage(producerInfo, destination, scenario.deliveryMode));
 
         // Setup a second connection with a queue browser.
-        StubConnection connection2 = createConnection();
-        ConnectionInfo connectionInfo2 = createConnectionInfo();
-        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
-        ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
+        StubConnection connection2 = scenario.createConnection();
+        ConnectionInfo connectionInfo2 = scenario.createConnectionInfo();
+        SessionInfo sessionInfo2 = scenario.createSessionInfo(connectionInfo2);
+        ConsumerInfo consumerInfo2 = scenario.createConsumerInfo(sessionInfo2, destination);
         consumerInfo2.setPrefetchSize(1);
         consumerInfo2.setBrowser(true);
         connection2.send(connectionInfo2);
@@ -311,38 +376,39 @@
         List<Message> messages = new ArrayList<Message>();
 
         for (int i = 0; i < 4; i++) {
-            Message m1 = receiveMessage(connection1);
-            assertNotNull("m1 is null for index: " + i, m1);
+            Message m1 = scenario.receiveMessage(connection1);
+            Assert.assertNotNull(m1, ("m1 is null for index: " + i));
             messages.add(m1);
         }
 
         for (int i = 0; i < 4; i++) {
             Message m1 = messages.get(i);
-            Message m2 = receiveMessage(connection2);
-            assertNotNull("m2 is null for index: " + i, m2);
+            Message m2 = scenario.receiveMessage(connection2);
+            Assert.assertNotNull(m2, ("m2 is null for index: " + i));
             assertEquals(m1.getMessageId(), m2.getMessageId());
-            connection2.send(createAck(consumerInfo2, m2, 1, MessageAck.DELIVERED_ACK_TYPE));
+            connection2.send(scenario.createAck(consumerInfo2, m2, 1, MessageAck.DELIVERED_ACK_TYPE));
         }
 
-        assertNoMessagesLeft(connection1);
-        assertNoMessagesLeft(connection2);
+        scenario.assertNoMessagesLeft(connection1);
+        scenario.assertNoMessagesLeft(connection2);
     }
 
     
     /*
      * change the order of the above test
      */
-    public void testQueueBrowserWith2ConsumersBrowseFirst() throws Exception {
+	@Test(dataProvider = "default")
+    public void testQueueBrowserWith2ConsumersBrowseFirst(BrokerTestScenario scenario) throws Exception {
 
         ActiveMQDestination destination = new ActiveMQQueue("TEST");
-        deliveryMode = DeliveryMode.NON_PERSISTENT;
+        scenario.deliveryMode = DeliveryMode.NON_PERSISTENT;
         
         
         // Setup a second connection with a queue browser.
-        StubConnection connection2 = createConnection();
-        ConnectionInfo connectionInfo2 = createConnectionInfo();
-        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
-        ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
+        StubConnection connection2 = scenario.createConnection();
+        ConnectionInfo connectionInfo2 = scenario.createConnectionInfo();
+        SessionInfo sessionInfo2 = scenario.createSessionInfo(connectionInfo2);
+        ConsumerInfo consumerInfo2 = scenario.createConsumerInfo(sessionInfo2, destination);
         consumerInfo2.setPrefetchSize(10);
         consumerInfo2.setBrowser(true);
         connection2.send(connectionInfo2);
@@ -350,67 +416,68 @@
         connection2.request(consumerInfo2);
 
         // Setup a first connection
-        StubConnection connection1 = createConnection();
-        ConnectionInfo connectionInfo1 = createConnectionInfo();
-        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
-        ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
+        StubConnection connection1 = scenario.createConnection();
+        ConnectionInfo connectionInfo1 = scenario.createConnectionInfo();
+        SessionInfo sessionInfo1 = scenario.createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo = scenario.createProducerInfo(sessionInfo1);
         connection1.send(connectionInfo1);
         connection1.send(sessionInfo1);
         connection1.send(producerInfo);
 
-        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+        ConsumerInfo consumerInfo1 = scenario.createConsumerInfo(sessionInfo1, destination);
         consumerInfo1.setPrefetchSize(10);
         connection1.request(consumerInfo1);
 
         // Send the messages
-        connection1.send(createMessage(producerInfo, destination, deliveryMode));
-        connection1.send(createMessage(producerInfo, destination, deliveryMode));
-        connection1.send(createMessage(producerInfo, destination, deliveryMode));
+        connection1.send(scenario.createMessage(producerInfo, destination, scenario.deliveryMode));
+        connection1.send(scenario.createMessage(producerInfo, destination, scenario.deliveryMode));
+        connection1.send(scenario.createMessage(producerInfo, destination, scenario.deliveryMode));
         //as the messages are sent async - need to synchronize the last
         //one to ensure they arrive in the order we want
-        connection1.request(createMessage(producerInfo, destination, deliveryMode));
+        connection1.request(scenario.createMessage(producerInfo, destination, scenario.deliveryMode));
 
 
         List<Message> messages = new ArrayList<Message>();
 
         for (int i = 0; i < 4; i++) {
-            Message m1 = receiveMessage(connection1);
-            assertNotNull("m1 is null for index: " + i, m1);
+            Message m1 = scenario.receiveMessage(connection1);
+            Assert.assertNotNull(m1, ("m1 is null for index: " + i));
             messages.add(m1);
         }
 
         // no messages present in queue browser as there were no messages when it
         // was created
-        assertNoMessagesLeft(connection1);
-        assertNoMessagesLeft(connection2);
+        scenario.assertNoMessagesLeft(connection1);
+        scenario.assertNoMessagesLeft(connection2);
     }
 
-    public void testQueueBrowserWith2ConsumersInterleaved() throws Exception {
+	@Test(dataProvider = "default")
+    public void testQueueBrowserWith2ConsumersInterleaved(BrokerTestScenario scenario) throws Exception {
 
         ActiveMQDestination destination = new ActiveMQQueue("TEST");
-        deliveryMode = DeliveryMode.NON_PERSISTENT;
+        scenario.deliveryMode = DeliveryMode.NON_PERSISTENT;
         
         // Setup a first connection
-        StubConnection connection1 = createConnection();
-        ConnectionInfo connectionInfo1 = createConnectionInfo();
-        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
-        ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
+        StubConnection connection1 = scenario.createConnection();
+        ConnectionInfo connectionInfo1 = scenario.createConnectionInfo();
+        SessionInfo sessionInfo1 = scenario.createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo = scenario.createProducerInfo(sessionInfo1);
         connection1.send(connectionInfo1);
         connection1.send(sessionInfo1);
         connection1.send(producerInfo);
 
-        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+        ConsumerInfo consumerInfo1 = scenario.createConsumerInfo(sessionInfo1, destination);
         consumerInfo1.setPrefetchSize(10);
         connection1.request(consumerInfo1);
 
         // Send the messages
-        connection1.request(createMessage(producerInfo, destination, deliveryMode));
+        connection1.request(scenario.createMessage(producerInfo, destination, scenario.deliveryMode));
         
         // Setup a second connection with a queue browser.
-        StubConnection connection2 = createConnection();
-        ConnectionInfo connectionInfo2 = createConnectionInfo();
-        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
-        ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
+        StubConnection connection2 = scenario.createConnection();
+        ConnectionInfo connectionInfo2 = scenario.createConnectionInfo();
+        SessionInfo sessionInfo2 = scenario.createSessionInfo(connectionInfo2);
+        ConsumerInfo consumerInfo2 = scenario.createConsumerInfo(sessionInfo2, destination);
         consumerInfo2.setPrefetchSize(1);
         consumerInfo2.setBrowser(true);
         connection2.send(connectionInfo2);
@@ -418,119 +485,104 @@
         connection2.request(consumerInfo2);
 
         
-        connection1.send(createMessage(producerInfo, destination, deliveryMode));
-        connection1.send(createMessage(producerInfo, destination, deliveryMode));
+        connection1.send(scenario.createMessage(producerInfo, destination, scenario.deliveryMode));
+        connection1.send(scenario.createMessage(producerInfo, destination, scenario.deliveryMode));
         //as the messages are sent async - need to synchronize the last
         //one to ensure they arrive in the order we want
-        connection1.request(createMessage(producerInfo, destination, deliveryMode));
+        connection1.request(scenario.createMessage(producerInfo, destination, scenario.deliveryMode));
 
         
         List<Message> messages = new ArrayList<Message>();
 
         for (int i = 0; i < 4; i++) {
-            Message m1 = receiveMessage(connection1);
-            assertNotNull("m1 is null for index: " + i, m1);
+            Message m1 = scenario.receiveMessage(connection1);
+            Assert.assertNotNull(m1, ("m1 is null for index: " + i));
             messages.add(m1);
         }
 
         for (int i = 0; i < 1; i++) {
             Message m1 = messages.get(i);
-            Message m2 = receiveMessage(connection2);
-            assertNotNull("m2 is null for index: " + i, m2);
+            Message m2 = scenario.receiveMessage(connection2);
+            Assert.assertNotNull(m2, ("m2 is null for index: " + i));
             assertEquals(m1.getMessageId(), m2.getMessageId());
-            connection2.send(createAck(consumerInfo2, m2, 1, MessageAck.DELIVERED_ACK_TYPE));
+            connection2.send(scenario.createAck(consumerInfo2, m2, 1, MessageAck.DELIVERED_ACK_TYPE));
         }
 
-        assertNoMessagesLeft(connection1);
-        assertNoMessagesLeft(connection2);
+        scenario.assertNoMessagesLeft(connection1);
+        scenario.assertNoMessagesLeft(connection2);
     }
 
-    
-    public void initCombosForTestConsumerPrefetchAndStandardAck() {
-        addCombinationValues("deliveryMode", new Object[] {
-        // Integer.valueOf(DeliveryMode.NON_PERSISTENT),
-                             Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("destinationType",
-                             new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
-                                           Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
-                                           Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE),
-                                           Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
-    }
-
-    public void testConsumerPrefetchAndStandardAck() throws Exception {
+	@Test(dataProvider = "deliveryMode-all-destinations-combinations")
+    public void testConsumerPrefetchAndStandardAck(BrokerTestScenario scenario) throws Exception {
 
         // Start a producer and consumer
-        StubConnection connection = createConnection();
-        ConnectionInfo connectionInfo = createConnectionInfo();
-        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
-        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        StubConnection connection = scenario.createConnection();
+        ConnectionInfo connectionInfo = scenario.createConnectionInfo();
+        SessionInfo sessionInfo = scenario.createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = scenario.createProducerInfo(sessionInfo);
         connection.send(connectionInfo);
         connection.send(sessionInfo);
         connection.send(producerInfo);
 
-        destination = createDestinationInfo(connection, connectionInfo, destinationType);
+        ActiveMQDestination destination = scenario.createDestinationInfo(connection, connectionInfo, scenario.destinationType);
 
-        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+        ConsumerInfo consumerInfo = scenario.createConsumerInfo(sessionInfo, destination);
         consumerInfo.setPrefetchSize(1);
         connection.send(consumerInfo);
 
         // Send 3 messages to the broker.
-        connection.send(createMessage(producerInfo, destination, deliveryMode));
-        connection.send(createMessage(producerInfo, destination, deliveryMode));
-        connection.request(createMessage(producerInfo, destination, deliveryMode));
+        connection.send(scenario.createMessage(producerInfo, destination, scenario.deliveryMode));
+        connection.send(scenario.createMessage(producerInfo, destination, scenario.deliveryMode));
+        connection.request(scenario.createMessage(producerInfo, destination, scenario.deliveryMode));
 
         // Make sure only 1 message was delivered.
-        Message m1 = receiveMessage(connection);
-        assertNotNull(m1);
-        assertNoMessagesLeft(connection);
+        Message m1 = scenario.receiveMessage(connection);
+        Assert.assertNotNull(m1);
+        scenario.assertNoMessagesLeft(connection);
 
         // Acknowledge the first message. This should cause the next message to
         // get dispatched.
-        connection.send(createAck(consumerInfo, m1, 1, MessageAck.STANDARD_ACK_TYPE));
+        connection.send(scenario.createAck(consumerInfo, m1, 1, MessageAck.STANDARD_ACK_TYPE));
 
-        Message m2 = receiveMessage(connection);
-        assertNotNull(m2);
-        connection.send(createAck(consumerInfo, m2, 1, MessageAck.STANDARD_ACK_TYPE));
+        Message m2 = scenario.receiveMessage(connection);
+        Assert.assertNotNull(m2);
+        connection.send(scenario.createAck(consumerInfo, m2, 1, MessageAck.STANDARD_ACK_TYPE));
 
-        Message m3 = receiveMessage(connection);
-        assertNotNull(m3);
-        connection.send(createAck(consumerInfo, m3, 1, MessageAck.STANDARD_ACK_TYPE));
-
-        connection.send(closeConnectionInfo(connectionInfo));
-    }
+        Message m3 = scenario.receiveMessage(connection);
+        Assert.assertNotNull(m3);
+        connection.send(scenario.createAck(consumerInfo, m3, 1, MessageAck.STANDARD_ACK_TYPE));
 
-    public void initCombosForTestConsumerCloseCausesRedelivery() {
-        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
-                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("destination", new Object[] {new ActiveMQQueue("TEST")});
+        connection.send(scenario.closeConnectionInfo(connectionInfo));
     }
 
-    public void testConsumerCloseCausesRedelivery() throws Exception {
+	@Test(dataProvider = "deliveryMode-combinations")
+    public void testConsumerCloseCausesRedelivery(BrokerTestScenario scenario) throws Exception {
 
         // Setup a first connection
-        StubConnection connection1 = createConnection();
-        ConnectionInfo connectionInfo1 = createConnectionInfo();
-        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
-        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+        StubConnection connection1 = scenario.createConnection();
+        ConnectionInfo connectionInfo1 = scenario.createConnectionInfo();
+        SessionInfo sessionInfo1 = scenario.createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo1 = scenario.createProducerInfo(sessionInfo1);
         connection1.send(connectionInfo1);
         connection1.send(sessionInfo1);
         connection1.send(producerInfo1);
 
-        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+        ActiveMQDestination destination = new ActiveMQQueue("TEST");
+        ConsumerInfo consumerInfo1 = scenario.createConsumerInfo(sessionInfo1, destination);
         consumerInfo1.setPrefetchSize(100);
         connection1.request(consumerInfo1);
 
         // Send the messages
-        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
-        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
-        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
-        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
+        connection1.send(scenario.createMessage(producerInfo1, destination, scenario.deliveryMode));
+        connection1.send(scenario.createMessage(producerInfo1, destination, scenario.deliveryMode));
+        connection1.send(scenario.createMessage(producerInfo1, destination, scenario.deliveryMode));
+        connection1.send(scenario.createMessage(producerInfo1, destination, scenario.deliveryMode));
 
         // Receive the messages.
         for (int i = 0; i < 4; i++) {
-            Message m1 = receiveMessage(connection1);
-            assertNotNull("m1 is null for index: " + i, m1);
-            assertFalse(m1.isRedelivered());
+            Message m1 = scenario.receiveMessage(connection1);
+            Assert.assertNotNull(m1, ("m1 is null for index: " + i));
+            Assert.assertFalse(m1.isRedelivered());
         }
 
         // Close the consumer without acking.. this should cause re-delivery of
@@ -538,63 +590,64 @@
         connection1.send(consumerInfo1.createRemoveCommand());
 
         // Create another consumer that should get the messages again.
-        ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo1, destination);
+        ConsumerInfo consumerInfo2 = scenario.createConsumerInfo(sessionInfo1, destination);
         consumerInfo2.setPrefetchSize(100);
         connection1.request(consumerInfo2);
 
         // Receive the messages.
         for (int i = 0; i < 4; i++) {
-            Message m1 = receiveMessage(connection1);
-            assertNotNull("m1 is null for index: " + i, m1);
-            assertTrue(m1.isRedelivered());
+            Message m1 = scenario.receiveMessage(connection1);
+            Assert.assertNotNull(m1, ("m1 is null for index: " + i));
+            Assert.assertTrue(m1.isRedelivered());
         }
-        assertNoMessagesLeft(connection1);
+        scenario.assertNoMessagesLeft(connection1);
 
     }
 
-    public void testTopicDurableSubscriptionCanBeRestored() throws Exception {
+	@Test(dataProvider = "default")
+    public void testTopicDurableSubscriptionCanBeRestored(BrokerTestScenario scenario) throws Exception {
 
         ActiveMQDestination destination = new ActiveMQTopic("TEST");
 
         // Setup a first connection
-        StubConnection connection1 = createConnection();
-        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        StubConnection connection1 = scenario.createConnection();
+        ConnectionInfo connectionInfo1 = scenario.createConnectionInfo();
         connectionInfo1.setClientId("clientid1");
-        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
-        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+        SessionInfo sessionInfo1 = scenario.createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo1 = scenario.createProducerInfo(sessionInfo1);
         connection1.send(connectionInfo1);
         connection1.send(sessionInfo1);
         connection1.send(producerInfo1);
 
-        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+        ConsumerInfo consumerInfo1 = scenario.createConsumerInfo(sessionInfo1, destination);
         consumerInfo1.setPrefetchSize(100);
         consumerInfo1.setSubscriptionName("test");
         connection1.send(consumerInfo1);
 
         // Send the messages
-        connection1.send(createMessage(producerInfo1, destination, DeliveryMode.PERSISTENT));
-        connection1.send(createMessage(producerInfo1, destination, DeliveryMode.PERSISTENT));
-        connection1.send(createMessage(producerInfo1, destination, DeliveryMode.PERSISTENT));
-        connection1.request(createMessage(producerInfo1, destination, DeliveryMode.PERSISTENT));
+        connection1.send(scenario.createMessage(producerInfo1, destination, DeliveryMode.PERSISTENT));
+        connection1.send(scenario.createMessage(producerInfo1, destination, DeliveryMode.PERSISTENT));
+        connection1.send(scenario.createMessage(producerInfo1, destination, DeliveryMode.PERSISTENT));
+        connection1.request(scenario.createMessage(producerInfo1, destination, DeliveryMode.PERSISTENT));
 
         // Get the messages
         Message m = null;
         for (int i = 0; i < 2; i++) {
-            m = receiveMessage(connection1);
-            assertNotNull(m);
+            m = scenario.receiveMessage(connection1);
+            Assert.assertNotNull(m);
         }
         // Ack the last message.
-        connection1.send(createAck(consumerInfo1, m, 2, MessageAck.STANDARD_ACK_TYPE));
+        connection1.send(scenario.createAck(consumerInfo1, m, 2, MessageAck.STANDARD_ACK_TYPE));
         // Close the connection.
-        connection1.request(closeConnectionInfo(connectionInfo1));
+        connection1.request(scenario.closeConnectionInfo(connectionInfo1));
         connection1.stop();
 
         // Setup a second connection
-        StubConnection connection2 = createConnection();
-        ConnectionInfo connectionInfo2 = createConnectionInfo();
+        StubConnection connection2 = scenario.createConnection();
+        ConnectionInfo connectionInfo2 = scenario.createConnectionInfo();
         connectionInfo2.setClientId("clientid1");
-        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
-        ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
+        SessionInfo sessionInfo2 = scenario.createSessionInfo(connectionInfo2);
+        ConsumerInfo consumerInfo2 = scenario.createConsumerInfo(sessionInfo2, destination);
         consumerInfo2.setPrefetchSize(100);
         consumerInfo2.setSubscriptionName("test");
 
@@ -604,75 +657,70 @@
 
         // Get the rest of the messages
         for (int i = 0; i < 2; i++) {
-            Message m1 = receiveMessage(connection2);
-            assertNotNull("m1 is null for index: " + i, m1);
+            Message m1 = scenario.receiveMessage(connection2);
+            Assert.assertNotNull(m1, ("m1 is null for index: " + i));
         }
-        assertNoMessagesLeft(connection2);
-    }
-
-    public void initCombosForTestTopicConsumerOnlySeeMessagesAfterCreation() {
-        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
-                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("durableConsumer", new Object[] {Boolean.TRUE, Boolean.FALSE});
+        scenario.assertNoMessagesLeft(connection2);
     }
 
-    public void testTopicConsumerOnlySeeMessagesAfterCreation() throws Exception {
+	@Test(dataProvider = "deliveryMode-durableConsumer-combinations")
+    public void testTopicConsumerOnlySeeMessagesAfterCreation(BrokerTestScenario scenario) throws Exception {
 
         ActiveMQDestination destination = new ActiveMQTopic("TEST");
 
         // Setup a first connection
-        StubConnection connection1 = createConnection();
-        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        StubConnection connection1 = scenario.createConnection();
+        ConnectionInfo connectionInfo1 = scenario.createConnectionInfo();
         connectionInfo1.setClientId("A");
-        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
-        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+        SessionInfo sessionInfo1 = scenario.createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo1 = scenario.createProducerInfo(sessionInfo1);
         connection1.send(connectionInfo1);
         connection1.send(sessionInfo1);
         connection1.send(producerInfo1);
 
         // Send the 1st message
-        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
+        connection1.send(scenario.createMessage(producerInfo1, destination, scenario.deliveryMode));
 
         // Create the durable subscription.
-        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
-        if (durableConsumer) {
+        ConsumerInfo consumerInfo1 = scenario.createConsumerInfo(sessionInfo1, destination);
+        if (scenario.durableConsumer) {
             consumerInfo1.setSubscriptionName("test");
         }
         consumerInfo1.setPrefetchSize(100);
         connection1.send(consumerInfo1);
 
-        Message m = createMessage(producerInfo1, destination, deliveryMode);
+        Message m = scenario.createMessage(producerInfo1, destination, scenario.deliveryMode);
         connection1.send(m);
-        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
+        connection1.send(scenario.createMessage(producerInfo1, destination, scenario.deliveryMode));
 
         // Subscription should skip over the first message
-        Message m2 = receiveMessage(connection1);
-        assertNotNull(m2);
+        Message m2 = scenario.receiveMessage(connection1);
+        Assert.assertNotNull(m2);
         assertEquals(m.getMessageId(), m2.getMessageId());
-        m2 = receiveMessage(connection1);
-        assertNotNull(m2);
+        m2 = scenario.receiveMessage(connection1);
+        Assert.assertNotNull(m2);
 
-        assertNoMessagesLeft(connection1);
+        scenario.assertNoMessagesLeft(connection1);
     }
 
-    public void initCombosForTestTopicRetroactiveConsumerSeeMessagesBeforeCreation() {
-        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
-                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("durableConsumer", new Object[] {Boolean.TRUE, Boolean.FALSE});
-    }
+//    public void initCombosForTestTopicRetroactiveConsumerSeeMessagesBeforeCreation() {
+//        addCombinationValues("scenario.deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+//                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
+//        addCombinationValues("scenario.durableConsumer", new Object[] {Boolean.TRUE, Boolean.FALSE});
+//    }
 
     //
     // TODO: need to reimplement this since we don't fail when we send to a
     // non-existant
-    // destination. But if we can access the Region directly then we should be
+    // scenario.destination. But if we can access the Region directly then we should be
     // able to
-    // check that if the destination was removed.
+    // check that if the scenario.destination was removed.
     // 
     // public void initCombosForTestTempDestinationsRemovedOnConnectionClose() {
-    // addCombinationValues( "deliveryMode", new Object[]{
+    // addCombinationValues( "scenario.deliveryMode", new Object[]{
     // Integer.valueOf(DeliveryMode.NON_PERSISTENT),
     // Integer.valueOf(DeliveryMode.PERSISTENT)} );
-    // addCombinationValues( "destinationType", new Object[]{
+    // addCombinationValues( "scenario.destinationType", new Object[]{
     // Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE),
     // Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)} );
     // }
@@ -681,38 +729,38 @@
     // Exception {
     //        
     // // Setup a first connection
-    // StubConnection connection1 = createConnection();
-    // ConnectionInfo connectionInfo1 = createConnectionInfo();
-    // SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
-    // ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+    // StubConnection connection1 = scenario.createConnection();
+    // ConnectionInfo connectionInfo1 = scenario.createConnectionInfo();
+    // SessionInfo sessionInfo1 = scenario.createSessionInfo(connectionInfo1);
+    // ProducerInfo producerInfo1 = scenario.createProducerInfo(sessionInfo1);
     // connection1.send(connectionInfo1);
     // connection1.send(sessionInfo1);
     // connection1.send(producerInfo1);
     //
-    // destination = createDestinationInfo(connection1, connectionInfo1,
-    // destinationType);
+    // scenario.destination = scenario.createDestinationInfo(connection1, connectionInfo1,
+    // scenario.destinationType);
     //        
-    // StubConnection connection2 = createConnection();
-    // ConnectionInfo connectionInfo2 = createConnectionInfo();
-    // SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
-    // ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
+    // StubConnection connection2 = scenario.createConnection();
+    // ConnectionInfo connectionInfo2 = scenario.createConnectionInfo();
+    // SessionInfo sessionInfo2 = scenario.createSessionInfo(connectionInfo2);
+    // ProducerInfo producerInfo2 = scenario.createProducerInfo(sessionInfo2);
     // connection2.send(connectionInfo2);
     // connection2.send(sessionInfo2);
     // connection2.send(producerInfo2);
     //
-    // // Send from connection2 to connection1's temp destination. Should
+    // // Send from connection2 to connection1's temp scenario.destination. Should
     // succeed.
-    // connection2.send(createMessage(producerInfo2, destination,
-    // deliveryMode));
+    // connection2.send(scenario.createMessage(producerInfo2, scenario.destination,
+    // scenario.deliveryMode));
     //        
     // // Close connection 1
-    // connection1.request(closeConnectionInfo(connectionInfo1));
+    // connection1.request(scenario.closeConnectionInfo(connectionInfo1));
     //        
     // try {
-    // // Send from connection2 to connection1's temp destination. Should not
+    // // Send from connection2 to connection1's temp scenario.destination. Should not
     // succeed.
-    // connection2.request(createMessage(producerInfo2, destination,
-    // deliveryMode));
+    // connection2.request(scenario.createMessage(producerInfo2, scenario.destination,
+    // scenario.deliveryMode));
     // fail("Expected JMSException.");
     // } catch ( JMSException success ) {
     // }
@@ -720,38 +768,38 @@
     // }
 
     // public void initCombosForTestTempDestinationsAreNotAutoCreated() {
-    // addCombinationValues( "deliveryMode", new Object[]{
+    // addCombinationValues( "scenario.deliveryMode", new Object[]{
     // Integer.valueOf(DeliveryMode.NON_PERSISTENT),
     // Integer.valueOf(DeliveryMode.PERSISTENT)} );
-    // addCombinationValues( "destinationType", new Object[]{
+    // addCombinationValues( "scenario.destinationType", new Object[]{
     // Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE),
     // Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)} );
     // }
     //    
     //   
 
-    // We create temp destination on demand now so this test case is no longer
+    // We create temp scenario.destination on demand now so this test case is no longer
     // valid.
     //    
     // public void testTempDestinationsAreNotAutoCreated() throws Exception {
     //        
     // // Setup a first connection
-    // StubConnection connection1 = createConnection();
-    // ConnectionInfo connectionInfo1 = createConnectionInfo();
-    // SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
-    // ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+    // StubConnection connection1 = scenario.createConnection();
+    // ConnectionInfo connectionInfo1 = scenario.createConnectionInfo();
+    // SessionInfo sessionInfo1 = scenario.createSessionInfo(connectionInfo1);
+    // ProducerInfo producerInfo1 = scenario.createProducerInfo(sessionInfo1);
     // connection1.send(connectionInfo1);
     // connection1.send(sessionInfo1);
     // connection1.send(producerInfo1);
     //
-    // destination =
+    // scenario.destination =
     // ActiveMQDestination.createDestination(connectionInfo1.getConnectionId()+":1",
-    // destinationType);
+    // scenario.destinationType);
     //            
-    // // Should not be able to send to a non-existant temp destination.
+    // // Should not be able to send to a non-existant temp scenario.destination.
     // try {
-    // connection1.request(createMessage(producerInfo1, destination,
-    // deliveryMode));
+    // connection1.request(scenario.createMessage(producerInfo1, scenario.destination,
+    // scenario.deliveryMode));
     // fail("Expected JMSException.");
     // } catch ( JMSException success ) {
     // }
@@ -759,37 +807,38 @@
     // }
 
     
-    public void initCombosForTestExclusiveQueueDeliversToOnlyOneConsumer() {
-        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
-                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
-    }
+//    public void initCombosForTestExclusiveQueueDeliversToOnlyOneConsumer() {
+//        addCombinationValues("scenario.deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+//                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
+//    }
 
-    public void testExclusiveQueueDeliversToOnlyOneConsumer() throws Exception {
+	@Test(dataProvider = "deliveryMode-combinations")
+    public void testExclusiveQueueDeliversToOnlyOneConsumer(BrokerTestScenario scenario) throws Exception {
 
         ActiveMQDestination destination = new ActiveMQQueue("TEST");
 
         // Setup a first connection
-        StubConnection connection1 = createConnection();
-        ConnectionInfo connectionInfo1 = createConnectionInfo();
-        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
-        ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
+        StubConnection connection1 = scenario.createConnection();
+        ConnectionInfo connectionInfo1 = scenario.createConnectionInfo();
+        SessionInfo sessionInfo1 = scenario.createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo = scenario.createProducerInfo(sessionInfo1);
         connection1.send(connectionInfo1);
         connection1.send(sessionInfo1);
         connection1.send(producerInfo);
 
-        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+        ConsumerInfo consumerInfo1 = scenario.createConsumerInfo(sessionInfo1, destination);
         consumerInfo1.setPrefetchSize(1);
         consumerInfo1.setExclusive(true);
         connection1.send(consumerInfo1);
 
         // Send a message.. this should make consumer 1 the exclusive owner.
-        connection1.request(createMessage(producerInfo, destination, deliveryMode));
+        connection1.request(scenario.createMessage(producerInfo, destination, scenario.deliveryMode));
 
         // Setup a second connection
-        StubConnection connection2 = createConnection();
-        ConnectionInfo connectionInfo2 = createConnectionInfo();
-        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
-        ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
+        StubConnection connection2 = scenario.createConnection();
+        ConnectionInfo connectionInfo2 = scenario.createConnectionInfo();
+        SessionInfo sessionInfo2 = scenario.createSessionInfo(connectionInfo2);
+        ConsumerInfo consumerInfo2 = scenario.createConsumerInfo(sessionInfo2, destination);
         consumerInfo2.setPrefetchSize(1);
         consumerInfo2.setExclusive(true);
         connection2.send(connectionInfo2);
@@ -798,318 +847,292 @@
 
         // Second message should go to consumer 1 even though consumer 2 is
         // ready for dispatch.
-        connection1.send(createMessage(producerInfo, destination, deliveryMode));
+        connection1.send(scenario.createMessage(producerInfo, destination, scenario.deliveryMode));
 
         // Acknowledge the first 2 messages
         for (int i = 0; i < 2; i++) {
-            Message m1 = receiveMessage(connection1);
-            assertNotNull(m1);
-            connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
+            Message m1 = scenario.receiveMessage(connection1);
+            Assert.assertNotNull(m1);
+            connection1.send(scenario.createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
         }
 
         // Close the first consumer.
-        connection1.send(closeConsumerInfo(consumerInfo1));
+        connection1.send(scenario.closeConsumerInfo(consumerInfo1));
 
         // The last two messages should now go the the second consumer.
-        connection1.send(createMessage(producerInfo, destination, deliveryMode));
-        connection1.send(createMessage(producerInfo, destination, deliveryMode));
+        connection1.send(scenario.createMessage(producerInfo, destination, scenario.deliveryMode));
+        connection1.send(scenario.createMessage(producerInfo, destination, scenario.deliveryMode));
 
         for (int i = 0; i < 2; i++) {
-            Message m1 = receiveMessage(connection2);
-            assertNotNull(m1);
-            connection2.send(createAck(consumerInfo2, m1, 1, MessageAck.STANDARD_ACK_TYPE));
+            Message m1 = scenario.receiveMessage(connection2);
+            Assert.assertNotNull(m1);
+            connection2.send(scenario.createAck(consumerInfo2, m1, 1, MessageAck.STANDARD_ACK_TYPE));
         }
 
-        assertNoMessagesLeft(connection2);
-    }
-
-    public void initCombosForTestWildcardConsume() {
-        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
-                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
-                                                              Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});
+        scenario.assertNoMessagesLeft(connection2);
     }
 
-    public void testWildcardConsume() throws Exception {
+	@Test(dataProvider = "deliveryMode-perm-destinations-combinations")
+    public void testWildcardConsume(BrokerTestScenario scenario) throws Exception {
 
         // Setup a first connection
-        StubConnection connection1 = createConnection();
-        ConnectionInfo connectionInfo1 = createConnectionInfo();
-        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
-        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+        StubConnection connection1 = scenario.createConnection();
+        ConnectionInfo connectionInfo1 = scenario.createConnectionInfo();
+        SessionInfo sessionInfo1 = scenario.createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo1 = scenario.createProducerInfo(sessionInfo1);
         connection1.send(connectionInfo1);
         connection1.send(sessionInfo1);
         connection1.send(producerInfo1);
 
         // setup the wildcard consumer.
         ActiveMQDestination compositeDestination = ActiveMQDestination.createDestination("WILD.*.TEST",
-                                                                                         destinationType);
-        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, compositeDestination);
+                                                                                         scenario.destinationType);
+        ConsumerInfo consumerInfo1 = scenario.createConsumerInfo(sessionInfo1, compositeDestination);
         consumerInfo1.setPrefetchSize(100);
         connection1.send(consumerInfo1);
 
         // These two message should NOT match the wild card.
-        connection1.send(createMessage(producerInfo1, ActiveMQDestination.createDestination("WILD.CARD",
-                                                                                            destinationType),
-                                       deliveryMode));
-        connection1.send(createMessage(producerInfo1, ActiveMQDestination.createDestination("WILD.TEST",
-                                                                                            destinationType),
-                                       deliveryMode));
+        connection1.send(scenario.createMessage(producerInfo1, ActiveMQDestination.createDestination("WILD.CARD",
+                                                                                            scenario.destinationType),
+                                       scenario.deliveryMode));
+        connection1.send(scenario.createMessage(producerInfo1, ActiveMQDestination.createDestination("WILD.TEST",
+                                                                                            scenario.destinationType),
+                                       scenario.deliveryMode));
 
         // These two message should match the wild card.
-        ActiveMQDestination d1 = ActiveMQDestination.createDestination("WILD.CARD.TEST", destinationType);
-        connection1.send(createMessage(producerInfo1, d1, deliveryMode));
+        ActiveMQDestination d1 = ActiveMQDestination.createDestination("WILD.CARD.TEST", scenario.destinationType);
+        connection1.send(scenario.createMessage(producerInfo1, d1, scenario.deliveryMode));
         
-        Message m = receiveMessage(connection1);
-        assertNotNull(m);
+        Message m = scenario.receiveMessage(connection1);
+        Assert.assertNotNull(m);
         assertEquals(d1, m.getDestination());
 
-        ActiveMQDestination d2 = ActiveMQDestination.createDestination("WILD.FOO.TEST", destinationType);
-        connection1.request(createMessage(producerInfo1, d2, deliveryMode));
-        m = receiveMessage(connection1);
-        assertNotNull(m);
+        ActiveMQDestination d2 = ActiveMQDestination.createDestination("WILD.FOO.TEST", scenario.destinationType);
+        connection1.request(scenario.createMessage(producerInfo1, d2, scenario.deliveryMode));
+        m = scenario.receiveMessage(connection1);
+        Assert.assertNotNull(m);
         assertEquals(d2, m.getDestination());
 
-        assertNoMessagesLeft(connection1);
-        connection1.send(closeConnectionInfo(connectionInfo1));
+        scenario.assertNoMessagesLeft(connection1);
+        connection1.send(scenario.closeConnectionInfo(connectionInfo1));
     }
 
-    public void initCombosForTestCompositeConsume() {
-        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
-                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
-                                                              Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});
-    }
-
-    public void testCompositeConsume() throws Exception {
+	@Test(dataProvider = "deliveryMode-perm-destinations-combinations")
+    public void testCompositeConsume(BrokerTestScenario scenario) throws Exception {
 
         // Setup a first connection
-        StubConnection connection1 = createConnection();
-        ConnectionInfo connectionInfo1 = createConnectionInfo();
-        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
-        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+        StubConnection connection1 = scenario.createConnection();
+        ConnectionInfo connectionInfo1 = scenario.createConnectionInfo();
+        SessionInfo sessionInfo1 = scenario.createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo1 = scenario.createProducerInfo(sessionInfo1);
         connection1.send(connectionInfo1);
         connection1.send(sessionInfo1);
         connection1.send(producerInfo1);
 
         // setup the composite consumer.
         ActiveMQDestination compositeDestination = ActiveMQDestination.createDestination("A,B",
-                                                                                         destinationType);
-        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, compositeDestination);
+                                                                                         scenario.destinationType);
+        ConsumerInfo consumerInfo1 = scenario.createConsumerInfo(sessionInfo1, compositeDestination);
         consumerInfo1.setRetroactive(true);
         consumerInfo1.setPrefetchSize(100);
         connection1.send(consumerInfo1);
 
         // Publish to the two destinations
-        ActiveMQDestination destinationA = ActiveMQDestination.createDestination("A", destinationType);
-        ActiveMQDestination destinationB = ActiveMQDestination.createDestination("B", destinationType);
+        ActiveMQDestination destinationA = ActiveMQDestination.createDestination("A", scenario.destinationType);
+        ActiveMQDestination destinationB = ActiveMQDestination.createDestination("B", scenario.destinationType);
 
-        // Send a message to each destination .
-        connection1.send(createMessage(producerInfo1, destinationA, deliveryMode));
-        connection1.send(createMessage(producerInfo1, destinationB, deliveryMode));
+        // Send a message to each scenario.destination .
+        connection1.send(scenario.createMessage(producerInfo1, destinationA, scenario.deliveryMode));
+        connection1.send(scenario.createMessage(producerInfo1, destinationB, scenario.deliveryMode));
 
         // The consumer should get both messages.
         for (int i = 0; i < 2; i++) {
-            Message m1 = receiveMessage(connection1);
-            assertNotNull(m1);
+            Message m1 = scenario.receiveMessage(connection1);
+            Assert.assertNotNull(m1);
         }
 
-        assertNoMessagesLeft(connection1);
-        connection1.send(closeConnectionInfo(connectionInfo1));
+        scenario.assertNoMessagesLeft(connection1);
+        connection1.send(scenario.closeConnectionInfo(connectionInfo1));
     }
 
-
-    public void initCombosForTestConnectionCloseCascades() {
-        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
-                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("destination", new Object[] {new ActiveMQTopic("TEST"),
-                                                          new ActiveMQQueue("TEST")});
-    }
-
-    public void testConnectionCloseCascades() throws Exception {
+	@Test(dataProvider = "deliveryMode-perm-destinations-combinations")
+    public void testConnectionCloseCascades(BrokerTestScenario scenario) throws Exception {
 
         // Setup a first connection
-        StubConnection connection1 = createConnection();
-        ConnectionInfo connectionInfo1 = createConnectionInfo();
-        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
-        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+        StubConnection connection1 = scenario.createConnection();
+        ConnectionInfo connectionInfo1 = scenario.createConnectionInfo();
+        SessionInfo sessionInfo1 = scenario.createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo1 = scenario.createProducerInfo(sessionInfo1);
         connection1.send(connectionInfo1);
         connection1.send(sessionInfo1);
         connection1.send(producerInfo1);
-        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+        ActiveMQDestination destination = ActiveMQDestination.createDestination("TEST",  scenario.destinationType);
+        ConsumerInfo consumerInfo1 = scenario.createConsumerInfo(sessionInfo1, destination);
         consumerInfo1.setPrefetchSize(100);
         consumerInfo1.setNoLocal(true);
         connection1.request(consumerInfo1);
 
         // Setup a second connection
-        StubConnection connection2 = createConnection();
-        ConnectionInfo connectionInfo2 = createConnectionInfo();
-        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
-        ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
+        StubConnection connection2 = scenario.createConnection();
+        ConnectionInfo connectionInfo2 = scenario.createConnectionInfo();
+        SessionInfo sessionInfo2 = scenario.createSessionInfo(connectionInfo2);
+        ProducerInfo producerInfo2 = scenario.createProducerInfo(sessionInfo2);
         connection2.send(connectionInfo2);
         connection2.send(sessionInfo2);
         connection2.send(producerInfo2);
 
         // Send the messages
-        connection2.send(createMessage(producerInfo2, destination, deliveryMode));
-        connection2.send(createMessage(producerInfo2, destination, deliveryMode));
-        connection2.send(createMessage(producerInfo2, destination, deliveryMode));
-        connection2.send(createMessage(producerInfo2, destination, deliveryMode));
+        connection2.send(scenario.createMessage(producerInfo2, destination, scenario.deliveryMode));
+        connection2.send(scenario.createMessage(producerInfo2, destination, scenario.deliveryMode));
+        connection2.send(scenario.createMessage(producerInfo2, destination, scenario.deliveryMode));
+        connection2.send(scenario.createMessage(producerInfo2, destination, scenario.deliveryMode));
 
         for (int i = 0; i < 4; i++) {
-            Message m1 = receiveMessage(connection1);
-            assertNotNull(m1);
-            connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
+            Message m1 = scenario.receiveMessage(connection1);
+            Assert.assertNotNull(m1);
+            connection1.send(scenario.createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
         }
 
         // give the async ack a chance to perculate and validate all are currently consumed
-        assertNull(connection1.getDispatchQueue().poll(MAX_NULL_WAIT, TimeUnit.MILLISECONDS));
+        Assert.assertNull(connection1.getDispatchQueue().poll(scenario.MAX_NULL_WAIT, TimeUnit.MILLISECONDS));
 
         // Close the connection, this should in turn close the consumer.
-        connection1.request(closeConnectionInfo(connectionInfo1));
+        connection1.request(scenario.closeConnectionInfo(connectionInfo1));
 
         // Send another message, connection1 should not get the message.
-        connection2.request(createMessage(producerInfo2, destination, deliveryMode));
-
-        assertNull(connection1.getDispatchQueue().poll(MAX_NULL_WAIT, TimeUnit.MILLISECONDS));
-    }
+        connection2.request(scenario.createMessage(producerInfo2, destination, scenario.deliveryMode));
 
-    public void initCombosForTestSessionCloseCascades() {
-        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
-                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("destination", new Object[] {new ActiveMQTopic("TEST"),
-                                                          new ActiveMQQueue("TEST")});
+        Assert.assertNull(connection1.getDispatchQueue().poll(scenario.MAX_NULL_WAIT, TimeUnit.MILLISECONDS));
     }
 
-    public void testSessionCloseCascades() throws Exception {
+	@Test(dataProvider = "deliveryMode-perm-destinations-combinations")
+    public void testSessionCloseCascades(BrokerTestScenario scenario) throws Exception {
 
         // Setup a first connection
-        StubConnection connection1 = createConnection();
-        ConnectionInfo connectionInfo1 = createConnectionInfo();
-        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
-        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+        StubConnection connection1 = scenario.createConnection();
+        ConnectionInfo connectionInfo1 = scenario.createConnectionInfo();
+        SessionInfo sessionInfo1 = scenario.createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo1 = scenario.createProducerInfo(sessionInfo1);
         connection1.send(connectionInfo1);
         connection1.send(sessionInfo1);
         connection1.send(producerInfo1);
-        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+        ActiveMQDestination destination = ActiveMQDestination.createDestination("TEST",  scenario.destinationType);
+        ConsumerInfo consumerInfo1 = scenario.createConsumerInfo(sessionInfo1, destination);
         consumerInfo1.setPrefetchSize(100);
         consumerInfo1.setNoLocal(true);
         connection1.request(consumerInfo1);
 
         // Setup a second connection
-        StubConnection connection2 = createConnection();
-        ConnectionInfo connectionInfo2 = createConnectionInfo();
-        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
-        ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
+        StubConnection connection2 = scenario.createConnection();
+        ConnectionInfo connectionInfo2 = scenario.createConnectionInfo();
+        SessionInfo sessionInfo2 = scenario.createSessionInfo(connectionInfo2);
+        ProducerInfo producerInfo2 = scenario.createProducerInfo(sessionInfo2);
         connection2.send(connectionInfo2);
         connection2.send(sessionInfo2);
         connection2.send(producerInfo2);
 
         // Send the messages
-        connection2.send(createMessage(producerInfo2, destination, deliveryMode));
-        connection2.send(createMessage(producerInfo2, destination, deliveryMode));
-        connection2.send(createMessage(producerInfo2, destination, deliveryMode));
-        connection2.send(createMessage(producerInfo2, destination, deliveryMode));
+        connection2.send(scenario.createMessage(producerInfo2, destination, scenario.deliveryMode));
+        connection2.send(scenario.createMessage(producerInfo2, destination, scenario.deliveryMode));
+        connection2.send(scenario.createMessage(producerInfo2, destination, scenario.deliveryMode));
+        connection2.send(scenario.createMessage(producerInfo2, destination, scenario.deliveryMode));
 
         for (int i = 0; i < 4; i++) {
-            Message m1 = receiveMessage(connection1);
-            assertNotNull(m1);
-            connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
+            Message m1 = scenario.receiveMessage(connection1);
+            Assert.assertNotNull(m1);
+            connection1.send(scenario.createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
         }
 
         // Close the session, this should in turn close the consumer.
-        connection1.request(closeSessionInfo(sessionInfo1));
+        connection1.request(scenario.closeSessionInfo(sessionInfo1));
 
         // Send another message, connection1 should not get the message.
-        connection2.request(createMessage(producerInfo2, destination, deliveryMode));
+        connection2.request(scenario.createMessage(producerInfo2, destination, scenario.deliveryMode));
 
-        assertNull(connection1.getDispatchQueue().poll(MAX_NULL_WAIT, TimeUnit.MILLISECONDS));
+        Assert.assertNull(connection1.getDispatchQueue().poll(scenario.MAX_NULL_WAIT, TimeUnit.MILLISECONDS));
     }
 
-    public void initCombosForTestConsumerClose() {
-        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
-                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("destination", new Object[] {new ActiveMQTopic("TEST"),
-                                                          new ActiveMQQueue("TEST")});
-    }
+//    public void initCombosForTestConsumerClose() {
+//        addCombinationValues("scenario.deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+//                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
+//        addCombinationValues("scenario.destination", new Object[] {new ActiveMQTopic("TEST"),
+//                                                          new ActiveMQQueue("TEST")});
+//    }
 
-    public void testConsumerClose() throws Exception {
+	@Test(dataProvider = "deliveryMode-perm-destinations-combinations")
+    public void testConsumerClose(BrokerTestScenario scenario) throws Exception {
 
         // Setup a first connection
-        StubConnection connection1 = createConnection();
-        ConnectionInfo connectionInfo1 = createConnectionInfo();
-        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
-        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+        StubConnection connection1 = scenario.createConnection();
+        ConnectionInfo connectionInfo1 = scenario.createConnectionInfo();
+        SessionInfo sessionInfo1 = scenario.createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo1 = scenario.createProducerInfo(sessionInfo1);
         connection1.send(connectionInfo1);
         connection1.send(sessionInfo1);
         connection1.send(producerInfo1);
-        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+        ActiveMQDestination destination = ActiveMQDestination.createDestination("TEST",  scenario.destinationType);
+        ConsumerInfo consumerInfo1 = scenario.createConsumerInfo(sessionInfo1, destination);
         consumerInfo1.setPrefetchSize(100);
         consumerInfo1.setNoLocal(true);
         connection1.request(consumerInfo1);
 
         // Setup a second connection
-        StubConnection connection2 = createConnection();
-        ConnectionInfo connectionInfo2 = createConnectionInfo();
-        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
-        ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
+        StubConnection connection2 = scenario.createConnection();
+        ConnectionInfo connectionInfo2 = scenario.createConnectionInfo();
+        SessionInfo sessionInfo2 = scenario.createSessionInfo(connectionInfo2);
+        ProducerInfo producerInfo2 = scenario.createProducerInfo(sessionInfo2);
         connection2.send(connectionInfo2);
         connection2.send(sessionInfo2);
         connection2.send(producerInfo2);
 
         // Send the messages
-        connection2.send(createMessage(producerInfo2, destination, deliveryMode));
-        connection2.send(createMessage(producerInfo2, destination, deliveryMode));
-        connection2.send(createMessage(producerInfo2, destination, deliveryMode));
-        connection2.send(createMessage(producerInfo2, destination, deliveryMode));
+        connection2.send(scenario.createMessage(producerInfo2, destination, scenario.deliveryMode));
+        connection2.send(scenario.createMessage(producerInfo2, destination, scenario.deliveryMode));
+        connection2.send(scenario.createMessage(producerInfo2, destination, scenario.deliveryMode));
+        connection2.send(scenario.createMessage(producerInfo2, destination, scenario.deliveryMode));
 
         for (int i = 0; i < 4; i++) {
-            Message m1 = receiveMessage(connection1);
-            assertNotNull(m1);
-            connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
+            Message m1 = scenario.receiveMessage(connection1);
+            Assert.assertNotNull(m1);
+            connection1.send(scenario.createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
         }
 
         // give the async ack a chance to perculate and validate all are currently consumed
-        assertNull(connection1.getDispatchQueue().poll(MAX_NULL_WAIT, TimeUnit.MILLISECONDS));
+        Assert.assertNull(connection1.getDispatchQueue().poll(scenario.MAX_NULL_WAIT, TimeUnit.MILLISECONDS));
  
         // Close the consumer.
-        connection1.request(closeConsumerInfo(consumerInfo1));
+        connection1.request(scenario.closeConsumerInfo(consumerInfo1));
 
         // Send another message, connection1 should not get the message.
-        connection2.request(createMessage(producerInfo2, destination, deliveryMode));
-
-        assertNull(connection1.getDispatchQueue().poll(MAX_NULL_WAIT, TimeUnit.MILLISECONDS));
-    }
+        connection2.request(scenario.createMessage(producerInfo2, destination, scenario.deliveryMode));
 
-
-    public void initCombosForTopicDispatchIsBroadcast() {
-        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
-                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
+        Assert.assertNull(connection1.getDispatchQueue().poll(scenario.MAX_NULL_WAIT, TimeUnit.MILLISECONDS));
     }
 
-    public void testTopicDispatchIsBroadcast() throws Exception {
+	@Test(dataProvider = "deliveryMode-combinations")
+    public void testTopicDispatchIsBroadcast(BrokerTestScenario scenario) throws Exception {
 
         ActiveMQDestination destination = new ActiveMQTopic("TEST");
 
         // Setup a first connection
-        StubConnection connection1 = createConnection();
-        ConnectionInfo connectionInfo1 = createConnectionInfo();
-        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
-        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+        StubConnection connection1 = scenario.createConnection();
+        ConnectionInfo connectionInfo1 = scenario.createConnectionInfo();
+        SessionInfo sessionInfo1 = scenario.createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo1 = scenario.createProducerInfo(sessionInfo1);
         connection1.send(connectionInfo1);
         connection1.send(sessionInfo1);
         connection1.send(producerInfo1);
 
-        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+        ConsumerInfo consumerInfo1 = scenario.createConsumerInfo(sessionInfo1, destination);
         consumerInfo1.setRetroactive(true);
         consumerInfo1.setPrefetchSize(100);
         connection1.request(consumerInfo1);
 
         // Setup a second connection
-        StubConnection connection2 = createConnection();
-        ConnectionInfo connectionInfo2 = createConnectionInfo();
-        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
-        ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
+        StubConnection connection2 = scenario.createConnection();
+        ConnectionInfo connectionInfo2 = scenario.createConnectionInfo();
+        SessionInfo sessionInfo2 = scenario.createSessionInfo(connectionInfo2);
+        ConsumerInfo consumerInfo2 = scenario.createConsumerInfo(sessionInfo2, destination);
         consumerInfo2.setRetroactive(true);
         consumerInfo2.setPrefetchSize(100);
         connection2.send(connectionInfo2);
@@ -1117,372 +1140,298 @@
         connection2.request(consumerInfo2);
 
         // Send the messages
-        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
-        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
-        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
-        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
+        connection1.send(scenario.createMessage(producerInfo1, destination, scenario.deliveryMode));
+        connection1.send(scenario.createMessage(producerInfo1, destination, scenario.deliveryMode));
+        connection1.send(scenario.createMessage(producerInfo1, destination, scenario.deliveryMode));
+        connection1.send(scenario.createMessage(producerInfo1, destination, scenario.deliveryMode));
 
         // Get the messages
         for (int i = 0; i < 4; i++) {
-            Message m1 = receiveMessage(connection1);
-            assertNotNull(m1);
-            m1 = receiveMessage(connection2);
-            assertNotNull(m1);
+            Message m1 = scenario.receiveMessage(connection1);
+            Assert.assertNotNull(m1);
+            m1 = scenario.receiveMessage(connection2);
+            Assert.assertNotNull(m1);
         }
     }
 
-    public void initCombosForTestQueueDispatchedAreRedeliveredOnConsumerClose() {
-        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
-                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("destinationType",
-                             new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
-                                           Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE)});
-    }
-
-    public void testQueueDispatchedAreRedeliveredOnConsumerClose() throws Exception {
+	@Test(dataProvider = "deliveryMode-queue-combinations")

[... 476 lines stripped ...]