You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/04/04 18:09:14 UTC

[05/42] activemq-artemis git commit: ARTEMIS-463 Improvement to the openwire testsuite https://issues.apache.org/jira/browse/ARTEMIS-463

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java
index e792228..0a127dd 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java
@@ -18,6 +18,8 @@ package org.apache.activemq.transport.failover;
 
 import java.io.File;
 import java.io.FileOutputStream;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
@@ -30,38 +32,46 @@ import javax.jms.Session;
 import junit.framework.TestCase;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
 import org.apache.activemq.network.NetworkConnector;
 import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
 
-public class FailoverUpdateURIsTest extends TestCase {
+public class FailoverUpdateURIsTest extends OpenwireArtemisBaseTest {
 
    private static final String QUEUE_NAME = "test.failoverupdateuris";
    private static final Logger LOG = Logger.getLogger(FailoverUpdateURIsTest.class);
 
-   String firstTcpUri = "tcp://localhost:61616";
-   String secondTcpUri = "tcp://localhost:61626";
+   String firstTcpUri = newURI(0);
+   String secondTcpUri = newURI(10);
    Connection connection = null;
-   BrokerService bs1 = null;
-   BrokerService bs2 = null;
+   EmbeddedJMS server0 = null;
+   EmbeddedJMS server1 = null;
 
-   @Override
+   @After
    public void tearDown() throws Exception {
       if (connection != null) {
          connection.close();
       }
-      if (bs1 != null) {
-         bs1.stop();
+      if (server0 != null) {
+         server0.stop();
       }
-      if (bs2 != null) {
-         bs2.stop();
+      if (server1 != null) {
+         server1.stop();
       }
    }
 
+   @Test
    public void testUpdateURIsViaFile() throws Exception {
 
-      String targetDir = "target/" + getName();
+      String targetDir = "target/testUpdateURIsViaFile";
       new File(targetDir).mkdir();
       File updateFile = new File(targetDir + "/updateURIsFile.txt");
       LOG.info(updateFile);
@@ -72,8 +82,9 @@ public class FailoverUpdateURIsTest extends TestCase {
       out.write(firstTcpUri.getBytes());
       out.close();
 
-      bs1 = createBroker("bs1", firstTcpUri);
-      bs1.start();
+      Configuration config0 = createConfig(0);
+      server0 = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+      server0.start();
 
       // no failover uri's to start with, must be read from file...
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:()?updateURIsURL=file:///" + updateFile.getAbsoluteFile());
@@ -86,14 +97,14 @@ public class FailoverUpdateURIsTest extends TestCase {
       Message message = session.createTextMessage("Test message");
       producer.send(message);
       Message msg = consumer.receive(2000);
-      assertNotNull(msg);
+      Assert.assertNotNull(msg);
 
-      bs1.stop();
-      bs1.waitUntilStopped();
-      bs1 = null;
+      server0.stop();
+      server0 = null;
 
-      bs2 = createBroker("bs2", secondTcpUri);
-      bs2.start();
+      Configuration config1 = createConfig(10);
+      server1 = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+      server1.start();
 
       // add the transport uri for broker number 2
       out = new FileOutputStream(updateFile, true);
@@ -103,25 +114,18 @@ public class FailoverUpdateURIsTest extends TestCase {
 
       producer.send(message);
       msg = consumer.receive(2000);
-      assertNotNull(msg);
-   }
-
-   private BrokerService createBroker(String name, String tcpUri) throws Exception {
-      BrokerService bs = new BrokerService();
-      bs.setBrokerName(name);
-      bs.setUseJmx(false);
-      bs.setPersistent(false);
-      bs.addConnector(tcpUri);
-      return bs;
+      Assert.assertNotNull(msg);
    }
 
+   @Test
    public void testAutoUpdateURIs() throws Exception {
-
-      bs1 = new BrokerService();
-      bs1.setUseJmx(false);
-      TransportConnector transportConnector = bs1.addConnector(firstTcpUri);
-      transportConnector.setUpdateClusterClients(true);
-      bs1.start();
+      Map<String, String> params = new HashMap<String, String>();
+      params.put("updateClusterClients", "true");
+      Configuration config0 = createConfig("localhost", 0, params);
+      deployClusterConfiguration(config0, 10);
+      server0 = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+      server0.start();
+      Assert.assertTrue(server0.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 1));
 
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + firstTcpUri + ")");
       connection = cf.createConnection();
@@ -133,24 +137,23 @@ public class FailoverUpdateURIsTest extends TestCase {
       Message message = session.createTextMessage("Test message");
       producer.send(message);
       Message msg = consumer.receive(4000);
-      assertNotNull(msg);
+      Assert.assertNotNull(msg);
 
-      bs2 = createBroker("bs2", secondTcpUri);
-      NetworkConnector networkConnector = bs2.addNetworkConnector("static:(" + firstTcpUri + ")");
-      networkConnector.setDuplex(true);
-      bs2.start();
-      LOG.info("started brokerService 2");
-      bs2.waitUntilStarted();
+      Configuration config1 = createConfig(10);
+      deployClusterConfiguration(config1, 0);
+      server1 = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+      server1.start();
+      Assert.assertTrue(server0.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+      Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
 
       TimeUnit.SECONDS.sleep(4);
 
       LOG.info("stopping brokerService 1");
-      bs1.stop();
-      bs1.waitUntilStopped();
-      bs1 = null;
+      server0.stop();
+      server0 = null;
 
       producer.send(message);
       msg = consumer.receive(4000);
-      assertNotNull(msg);
+      Assert.assertNotNull(msg);
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUriTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUriTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUriTest.java
index ae637ef..a028832 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUriTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUriTest.java
@@ -43,4 +43,5 @@ public class FailoverUriTest extends TransportUriTest {
    public static Test suite() {
       return suite(FailoverUriTest.class);
    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java
index 34e7333..dad241c 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java
@@ -18,6 +18,7 @@ package org.apache.activemq.transport.failover;
 
 import java.io.IOException;
 import java.util.Date;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.jms.Connection;
 import javax.jms.Message;
@@ -26,9 +27,13 @@ import javax.jms.Queue;
 import javax.jms.Session;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
 import org.apache.activemq.transport.TransportListener;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -36,19 +41,20 @@ import org.slf4j.LoggerFactory;
 
 import static org.junit.Assert.*;
 
-public class InitalReconnectDelayTest {
+public class InitalReconnectDelayTest extends OpenwireArtemisBaseTest {
 
    private static final transient Logger LOG = LoggerFactory.getLogger(InitalReconnectDelayTest.class);
-   protected BrokerService broker1;
-   protected BrokerService broker2;
+   protected EmbeddedJMS server1;
+   protected EmbeddedJMS server2;
+
+//   protected BrokerService broker1;
+//   protected BrokerService broker2;
 
    @Test
    public void testInitialReconnectDelay() throws Exception {
 
-      String uriString = "failover://(tcp://localhost:" +
-         broker1.getTransportConnectors().get(0).getConnectUri().getPort() +
-         ",tcp://localhost:" +
-         broker2.getTransportConnectors().get(0).getConnectUri().getPort() +
+      String uriString = "failover://(" + newURI(1) +
+         "," + newURI(2) +
          ")?randomize=false&initialReconnectDelay=15000";
 
       ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(uriString);
@@ -67,7 +73,7 @@ public class InitalReconnectDelayTest {
       //Halt the broker1...
       LOG.info("Stopping the Broker1...");
       start = (new Date()).getTime();
-      broker1.stop();
+      server1.stop();
 
       LOG.info("Attempting to send... failover should kick in...");
       producer.send(session.createTextMessage("TEST"));
@@ -81,10 +87,8 @@ public class InitalReconnectDelayTest {
    @Test
    public void testNoSuspendedCallbackOnNoReconnect() throws Exception {
 
-      String uriString = "failover://(tcp://localhost:" +
-         broker1.getTransportConnectors().get(0).getConnectUri().getPort() +
-         ",tcp://localhost:" +
-         broker2.getTransportConnectors().get(0).getConnectUri().getPort() +
+      String uriString = "failover://(" + newURI(1) +
+         "," + newURI(2) +
          ")?randomize=false&maxReconnectAttempts=0";
 
       ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(uriString);
@@ -124,7 +128,7 @@ public class InitalReconnectDelayTest {
       calls.set(0);
 
       LOG.info("Stopping the Broker1...");
-      broker1.stop();
+      server1.stop();
 
       LOG.info("Attempting to send... failover should throw on disconnect");
       try {
@@ -140,25 +144,19 @@ public class InitalReconnectDelayTest {
    @Before
    public void setUp() throws Exception {
 
-      final String dataDir = "target/data/shared";
+      Configuration config1 = createConfig(1);
+      Configuration config2 = createConfig(2);
 
-      broker1 = new BrokerService();
+      deployClusterConfiguration(config1, 2);
+      deployClusterConfiguration(config2, 1);
 
-      broker1.setBrokerName("broker1");
-      broker1.setDeleteAllMessagesOnStartup(true);
-      broker1.setDataDirectory(dataDir);
-      broker1.addConnector("tcp://localhost:0");
-      broker1.setUseJmx(false);
-      broker1.start();
-      broker1.waitUntilStarted();
+      server1 = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+      server2 = new EmbeddedJMS().setConfiguration(config2).setJmsConfiguration(new JMSConfigurationImpl());
 
-      broker2 = new BrokerService();
-      broker2.setBrokerName("broker2");
-      broker2.setDataDirectory(dataDir);
-      broker2.setUseJmx(false);
-      broker2.addConnector("tcp://localhost:0");
-      broker2.start();
-      broker2.waitUntilStarted();
+      server1.start();
+      server2.start();
+      Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+      Assert.assertTrue(server2.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
 
    }
 
@@ -172,16 +170,8 @@ public class InitalReconnectDelayTest {
 
    @After
    public void tearDown() throws Exception {
-
-      if (broker1.isStarted()) {
-         broker1.stop();
-         broker1.waitUntilStopped();
-      }
-
-      if (broker2.isStarted()) {
-         broker2.stop();
-         broker2.waitUntilStopped();
-      }
+      server1.stop();
+      server2.stop();
    }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java
index 4ba5516..83d43af 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java
@@ -28,29 +28,33 @@ import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
-
-import junit.framework.TestCase;
+import javax.jms.TextMessage;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.transport.mock.MockTransport;
-import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ReconnectTest extends TestCase {
+public class ReconnectTest extends OpenwireArtemisBaseTest {
 
    public static final int MESSAGES_PER_ITTERATION = 10;
    public static final int WORKER_COUNT = 10;
 
    private static final Logger LOG = LoggerFactory.getLogger(ReconnectTest.class);
 
-   private BrokerService bs;
+   private EmbeddedJMS bs;
    private URI tcpUri;
    private final AtomicInteger resumedCount = new AtomicInteger();
    private final AtomicInteger interruptedCount = new AtomicInteger();
@@ -102,7 +106,7 @@ public class ReconnectTest extends TestCase {
       }
 
       public void start() {
-         new Thread(this).start();
+         new Thread(this, name).start();
       }
 
       public void stop() {
@@ -129,13 +133,19 @@ public class ReconnectTest extends TestCase {
             MessageConsumer consumer = session.createConsumer(queue);
             MessageProducer producer = session.createProducer(queue);
             producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
             while (!stop.get()) {
+
                for (int i = 0; i < MESSAGES_PER_ITTERATION; i++) {
-                  producer.send(session.createTextMessage("TEST:" + i));
+                  TextMessage text = session.createTextMessage(name + " TEST:" + i);
+                  text.setStringProperty("myprop", name + " TEST:" + i);
+                  producer.send(text);
                }
+
                for (int i = 0; i < MESSAGES_PER_ITTERATION; i++) {
-                  consumer.receive();
+                  TextMessage m = (TextMessage) consumer.receive();
                }
+
                iterations.incrementAndGet();
             }
             session.close();
@@ -159,11 +169,12 @@ public class ReconnectTest extends TestCase {
       public synchronized void assertNoErrors() {
          if (error != null) {
             error.printStackTrace();
-            fail("Worker " + name + " got Exception: " + error);
+            Assert.fail("Worker " + name + " got Exception: " + error);
          }
       }
    }
 
+   @Test
    public void testReconnects() throws Exception {
 
       for (int k = 1; k < 10; k++) {
@@ -181,7 +192,7 @@ public class ReconnectTest extends TestCase {
                LOG.info("Test run " + k + ": Waiting for worker " + i + " to finish an iteration.");
                Thread.sleep(1000);
             }
-            assertTrue("Test run " + k + ": Worker " + i + " never completed an interation.", c != 0);
+            Assert.assertTrue("Test run " + k + ": Worker " + i + " never completed an interation.", c != 0);
             workers[i].assertNoErrors();
          }
 
@@ -192,7 +203,7 @@ public class ReconnectTest extends TestCase {
             workers[i].failConnection();
          }
 
-         assertTrue("Timed out waiting for all connections to be interrupted.", Wait.waitFor(new Wait.Condition() {
+         Assert.assertTrue("Timed out waiting for all connections to be interrupted.", Wait.waitFor(new Wait.Condition() {
             @Override
             public boolean isSatisified() throws Exception {
                LOG.debug("Test run waiting for connections to get interrupted.. at: " + interruptedCount.get());
@@ -201,7 +212,7 @@ public class ReconnectTest extends TestCase {
          }, TimeUnit.SECONDS.toMillis(60)));
 
          // Wait for the connections to re-establish...
-         assertTrue("Timed out waiting for all connections to be resumed.", Wait.waitFor(new Wait.Condition() {
+         Assert.assertTrue("Timed out waiting for all connections to be resumed.", Wait.waitFor(new Wait.Condition() {
             @Override
             public boolean isSatisified() throws Exception {
                LOG.debug("Test run waiting for connections to get resumed.. at: " + resumedCount.get());
@@ -220,26 +231,25 @@ public class ReconnectTest extends TestCase {
       }
    }
 
-   @Override
-   protected void setUp() throws Exception {
-      bs = new BrokerService();
-      bs.setPersistent(false);
-      bs.setUseJmx(true);
-      TransportConnector connector = bs.addConnector("tcp://localhost:0");
+   @Before
+   public void setUp() throws Exception {
+      Configuration config = createConfig(0);
+      bs = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl());
       bs.start();
-      tcpUri = connector.getConnectUri();
+      tcpUri = new URI(newURI(0));
+
       workers = new Worker[WORKER_COUNT];
       for (int i = 0; i < WORKER_COUNT; i++) {
-         workers[i] = new Worker("" + i);
+         workers[i] = new Worker("worker-" + i);
          workers[i].start();
       }
    }
 
-   @Override
-   protected void tearDown() throws Exception {
+   @After
+   public void tearDown() throws Exception {
       for (int i = 0; i < WORKER_COUNT; i++) {
          workers[i].stop();
       }
-      new ServiceStopper().stop(bs);
+      bs.stop();
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java
index 3a55473..ed6040d 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java
@@ -24,15 +24,16 @@ import java.util.concurrent.CountDownLatch;
 import javax.jms.Connection;
 import javax.net.ServerSocketFactory;
 
-import junit.framework.TestCase;
-
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.util.Wait;
+import org.junit.Assert;
+import org.junit.Test;
 
-public class SlowConnectionTest extends TestCase {
+public class SlowConnectionTest {
 
    private CountDownLatch socketReadyLatch = new CountDownLatch(1);
 
+   @Test
    public void testSlowConnection() throws Exception {
 
       MockBroker broker = new MockBroker();
@@ -57,7 +58,7 @@ public class SlowConnectionTest extends TestCase {
       }).start();
 
       int count = 0;
-      assertTrue("Transport count: " + count + ", expected <= 1", Wait.waitFor(new Wait.Condition() {
+      Assert.assertTrue("Transport count: " + count + ", expected <= 1", Wait.waitFor(new Wait.Condition() {
          @Override
          public boolean isSatisified() throws Exception {
             int count = 0;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java
index 5016e30..5759547 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java
@@ -16,22 +16,40 @@
  */
 package org.apache.activemq.transport.failover;
 
-public class TwoBrokerFailoverClusterTest extends FailoverClusterTestSupport {
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
 
-   private static final String BROKER_A_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61616";
-   private static final String BROKER_B_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61617";
-   private static final String BROKER_A_NOB_TC_ADDRESS = "tcp://127.0.0.1:61626";
-   private static final String BROKER_B_NOB_TC_ADDRESS = "tcp://127.0.0.1:61627";
-   private static final String BROKER_A_NAME = "BROKERA";
-   private static final String BROKER_B_NAME = "BROKERB";
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
+public class TwoBrokerFailoverClusterTest extends OpenwireArtemisBaseTest {
+
+   private static final int NUMBER_OF_CLIENTS = 30;
+   private final List<ActiveMQConnection> connections = new ArrayList<ActiveMQConnection>();
+   private EmbeddedJMS server0;
+   private EmbeddedJMS server1;
+   private String clientUrl;
+
+   @Test
    public void testTwoBrokersRestart() throws Exception {
-      createBrokerA(false, "", null, null);
-      createBrokerB(false, "", null, null);
-      getBroker(BROKER_B_NAME).waitUntilStarted();
 
       Thread.sleep(2000);
-      setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")");
       createClients();
 
       Thread.sleep(5000);
@@ -39,59 +57,113 @@ public class TwoBrokerFailoverClusterTest extends FailoverClusterTestSupport {
       assertClientsConnectedToTwoBrokers();
       assertClientsConnectionsEvenlyDistributed(.35);
 
-      getBroker(BROKER_A_NAME).stop();
-      getBroker(BROKER_A_NAME).waitUntilStopped();
-      removeBroker(BROKER_A_NAME);
+      server0.stop();
+      Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 1));
 
       Thread.sleep(1000);
 
-      assertAllConnectedTo(BROKER_B_CLIENT_TC_ADDRESS);
+      assertAllConnectedTo(newURI("127.0.0.1", 1));
 
       Thread.sleep(5000);
 
-      createBrokerA(false, "", null, null);
-      getBroker(BROKER_A_NAME).waitUntilStarted();
+      server0.start();
+      Assert.assertTrue(server0.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+      Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
       Thread.sleep(5000);
 
+      //need update-cluster-clients, -on-remove and rebalance set to true.
       assertClientsConnectedToTwoBrokers();
       assertClientsConnectionsEvenlyDistributed(.35);
    }
 
-   private void createBrokerA(boolean multi,
-                              String params,
-                              String clusterFilter,
-                              String destinationFilter) throws Exception {
-      final String tcParams = (params == null) ? "" : params;
-      if (getBroker(BROKER_A_NAME) == null) {
-         addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
-         addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS + tcParams, true);
-         if (multi) {
-            addTransportConnector(getBroker(BROKER_A_NAME), "network", BROKER_A_NOB_TC_ADDRESS + tcParams, false);
-            addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
-         }
-         else {
-            addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
-         }
-         getBroker(BROKER_A_NAME).start();
+
+   @Before
+   public void setUp() throws Exception {
+      HashMap<String, String> map = new HashMap<>();
+      map.put("rebalanceClusterClients", "true");
+      map.put("updateClusterClients", "true");
+      map.put("updateClusterClientsOnRemove", "true");
+      Configuration config0 = createConfig("127.0.0.1", 0, map);
+      Configuration config1 = createConfig("127.0.0.1", 1, map);
+
+      deployClusterConfiguration(config0, 1);
+      deployClusterConfiguration(config1, 0);
+
+      server0 = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+      server1 = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+
+      clientUrl = "failover://(" + newURI("127.0.0.1", 0) + "," + newURI("127.0.0.1", 1) + ")";
+
+      server0.start();
+      server1.start();
+      Assert.assertTrue(server0.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+      Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+   }
+
+   @After
+   public void tearDown() throws Exception {
+      for (ActiveMQConnection conn : connections) {
+         conn.close();
       }
+      server0.stop();
+      server1.stop();
    }
 
-   private void createBrokerB(boolean multi,
-                              String params,
-                              String clusterFilter,
-                              String destinationFilter) throws Exception {
-      final String tcParams = (params == null) ? "" : params;
-      if (getBroker(BROKER_B_NAME) == null) {
-         addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
-         addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS + tcParams, true);
-         if (multi) {
-            addTransportConnector(getBroker(BROKER_B_NAME), "network", BROKER_B_NOB_TC_ADDRESS + tcParams, false);
-            addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
+   protected void createClients() throws Exception {
+      createClients(NUMBER_OF_CLIENTS);
+   }
+
+   protected void createClients(int numOfClients) throws Exception {
+      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(clientUrl);
+      for (int i = 0; i < numOfClients; i++) {
+         ActiveMQConnection c = (ActiveMQConnection) factory.createConnection();
+         c.start();
+         Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Queue queue = s.createQueue(getClass().getName());
+         MessageConsumer consumer = s.createConsumer(queue);
+         connections.add(c);
+      }
+   }
+
+   protected void assertClientsConnectedToTwoBrokers() {
+      Set<String> set = new HashSet<String>();
+      for (ActiveMQConnection c : connections) {
+         if (c.getTransportChannel().getRemoteAddress() != null) {
+            set.add(c.getTransportChannel().getRemoteAddress());
          }
-         else {
-            addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
+      }
+      Assert.assertTrue("Only 2 connections should be found: " + set, set.size() == 2);
+   }
+
+   protected void assertClientsConnectionsEvenlyDistributed(double minimumPercentage) {
+      Map<String, Double> clientConnectionCounts = new HashMap<String, Double>();
+      int total = 0;
+      for (ActiveMQConnection c : connections) {
+         String key = c.getTransportChannel().getRemoteAddress();
+         if (key != null) {
+            total++;
+            if (clientConnectionCounts.containsKey(key)) {
+               double count = clientConnectionCounts.get(key);
+               count += 1.0;
+               clientConnectionCounts.put(key, count);
+            }
+            else {
+               clientConnectionCounts.put(key, 1.0);
+            }
          }
-         getBroker(BROKER_B_NAME).start();
+      }
+      Set<String> keys = clientConnectionCounts.keySet();
+      for (String key : keys) {
+         double count = clientConnectionCounts.get(key);
+         double percentage = count / total;
+         System.out.println(count + " of " + total + " connections for " + key + " = " + percentage);
+         Assert.assertTrue("Connections distribution expected to be >= than " + minimumPercentage + ".  Actuall distribution was " + percentage + " for connection " + key, percentage >= minimumPercentage);
+      }
+   }
+
+   protected void assertAllConnectedTo(String url) throws Exception {
+      for (ActiveMQConnection c : connections) {
+         Assert.assertEquals(url, c.getTransportChannel().getRemoteAddress());
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTest.java
index dc369be..a372b79 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTest.java
@@ -26,45 +26,42 @@ import javax.jms.Session;
 import junit.framework.TestCase;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
 import org.apache.activemq.broker.BrokerFactory;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
 import org.apache.activemq.util.MessageIdList;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 
-public class FanoutTest extends TestCase {
+public class FanoutTest extends OpenwireArtemisBaseTest {
 
-   BrokerService broker1;
-   BrokerService broker2;
+   EmbeddedJMS[] servers = new EmbeddedJMS[2];
 
    ActiveMQConnectionFactory producerFactory = new ActiveMQConnectionFactory("fanout:(static:(tcp://localhost:61616,tcp://localhost:61617))?fanOutQueues=true");
    Connection producerConnection;
    Session producerSession;
    int messageCount = 100;
 
-   @Override
+   @Before
    public void setUp() throws Exception {
-      broker1 = BrokerFactory.createBroker("broker:(tcp://localhost:61616)/brokerA?persistent=false&useJmx=false");
-      broker2 = BrokerFactory.createBroker("broker:(tcp://localhost:61617)/brokerB?persistent=false&useJmx=false");
-
-      broker1.start();
-      broker2.start();
-
-      broker1.waitUntilStarted();
-      broker2.waitUntilStarted();
+      setUpNonClusterServers(servers);
 
       producerConnection = producerFactory.createConnection();
       producerConnection.start();
       producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    }
 
-   @Override
+   @After
    public void tearDown() throws Exception {
       producerSession.close();
       producerConnection.close();
 
-      broker1.stop();
-      broker2.stop();
+      shutDownNonClusterServers(servers);
    }
 
+   @Test
    public void testSendReceive() throws Exception {
 
       MessageProducer prod = createProducer();
@@ -76,7 +73,6 @@ public class FanoutTest extends TestCase {
 
       assertMessagesReceived("tcp://localhost:61616");
       assertMessagesReceived("tcp://localhost:61617");
-
    }
 
    protected MessageProducer createProducer() throws Exception {
@@ -95,7 +91,7 @@ public class FanoutTest extends TestCase {
       listener.assertMessagesReceived(messageCount);
 
       consumer.close();
-      consumerConnection.close();
       consumerSession.close();
+      consumerConnection.close();
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.java
index 7e52f13..2cfc136 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.java
@@ -18,51 +18,111 @@ package org.apache.activemq.transport.fanout;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import javax.jms.DeliveryMode;
+import javax.jms.MessageNotWriteableException;
 
-import junit.framework.Test;
-
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
 import org.apache.activemq.broker.StubConnection;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.network.NetworkTestSupport;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportFilter;
 import org.apache.activemq.transport.mock.MockTransport;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class FanoutTransportBrokerTest extends NetworkTestSupport {
+@RunWith(Parameterized.class)
+public class FanoutTransportBrokerTest extends OpenwireArtemisBaseTest {
+   public static final boolean FAST_NO_MESSAGE_LEFT_ASSERT = System.getProperty("FAST_NO_MESSAGE_LEFT_ASSERT", "true").equals("true");
+
+   protected ArrayList<StubConnection> connections = new ArrayList<StubConnection>();
+   protected long idGenerator;
+   protected int msgIdGenerator;
+   protected int maxWait = 10000;
 
    private static final Logger LOG = LoggerFactory.getLogger(FanoutTransportBrokerTest.class);
 
-   public ActiveMQDestination destination;
-   public int deliveryMode;
+   private EmbeddedJMS server;
+   private EmbeddedJMS remoteServer;
+
+   private ActiveMQDestination destination;
+   private int deliveryMode;
 
-   public static Test suite() {
-      return suite(FanoutTransportBrokerTest.class);
+   @Parameterized.Parameters(name="test-{index}")
+   public static Collection<Object[]> getParams()
+   {
+      return Arrays.asList(new Object[][]{
+              {Integer.valueOf(DeliveryMode.NON_PERSISTENT), new ActiveMQQueue("TEST")},
+              {Integer.valueOf(DeliveryMode.NON_PERSISTENT), new ActiveMQTopic("TEST")},
+              {Integer.valueOf(DeliveryMode.PERSISTENT), new ActiveMQQueue("TEST")},
+              {Integer.valueOf(DeliveryMode.PERSISTENT), new ActiveMQTopic("TEST")}
+      });
    }
 
-   public static void main(String[] args) {
-      junit.textui.TestRunner.run(suite());
+   public FanoutTransportBrokerTest(int deliveryMode, ActiveMQDestination destination) {
+      this.deliveryMode = deliveryMode;
+      this.destination = destination;
    }
 
-   public void initCombosForTestPublisherFansout() {
-      addCombinationValues("deliveryMode", new Object[]{Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
-      addCombinationValues("destination", new Object[]{new ActiveMQTopic("TEST"), new ActiveMQQueue("TEST")});
+   @Before
+   public void setUp() throws Exception {
+      Configuration config0 = createConfig(0);
+      server = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+      Configuration config1 = createConfig(1);
+      remoteServer = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+      server.start();
+      remoteServer.start();
+
+   }
+   @After
+   public void tearDown() throws Exception {
+      for (StubConnection conn : connections) {
+         try {
+            conn.stop();
+         }
+         catch (Exception e) {
+         }
+      }
+      try {
+         remoteServer.stop();
+      }
+      catch (Exception e) {
+      }
+      try {
+         server.stop();
+      }
+      catch (Exception e) {
+      }
    }
 
+   @Test
    public void testPublisherFansout() throws Exception {
-
       // Start a normal consumer on the local broker
       StubConnection connection1 = createConnection();
       ConnectionInfo connectionInfo1 = createConnectionInfo();
@@ -94,21 +154,28 @@ public class FanoutTransportBrokerTest extends NetworkTestSupport {
       // Send the message using the fail over publisher.
       connection3.request(createMessage(producerInfo3, destination, deliveryMode));
 
-      assertNotNull(receiveMessage(connection1));
+      Assert.assertNotNull(receiveMessage(connection1));
       assertNoMessagesLeft(connection1);
 
-      assertNotNull(receiveMessage(connection2));
+      Assert.assertNotNull(receiveMessage(connection2));
       assertNoMessagesLeft(connection2);
 
    }
 
+   /*
    public void initCombosForTestPublisherWaitsForServerToBeUp() {
       addCombinationValues("deliveryMode", new Object[]{Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
       addCombinationValues("destination", new Object[]{new ActiveMQTopic("TEST")});
    }
+*/
 
+   @Test
    public void testPublisherWaitsForServerToBeUp() throws Exception {
 
+      if (name.getMethodName().contains("test-0") || name.getMethodName().contains("test-2")) {
+         System.out.println("Discarding invalid test: " + name.getMethodName());
+         return;
+      }
       // Start a normal consumer on the local broker
       StubConnection connection1 = createConnection();
       ConnectionInfo connectionInfo1 = createConnectionInfo();
@@ -140,19 +207,18 @@ public class FanoutTransportBrokerTest extends NetworkTestSupport {
       // Send the message using the fail over publisher.
       connection3.request(createMessage(producerInfo3, destination, deliveryMode));
 
-      assertNotNull(receiveMessage(connection1));
+      Assert.assertNotNull(receiveMessage(connection1));
       assertNoMessagesLeft(connection1);
 
-      assertNotNull(receiveMessage(connection2));
+      Assert.assertNotNull(receiveMessage(connection2));
       assertNoMessagesLeft(connection2);
 
       final CountDownLatch publishDone = new CountDownLatch(1);
 
       // The MockTransport is on the remote connection.
       // Slip in a new transport filter after the MockTransport
-      MockTransport mt = connection3.getTransport().narrow(MockTransport.class);
+      MockTransport mt = (MockTransport) connection3.getTransport().narrow(MockTransport.class);
       mt.install(new TransportFilter(mt.getNext()) {
-         @Override
          public void oneway(Object command) throws IOException {
             LOG.info("Dropping: " + command);
             // just eat it! to simulate a recent failure.
@@ -161,7 +227,6 @@ public class FanoutTransportBrokerTest extends NetworkTestSupport {
 
       // Send a message (async) as this will block
       new Thread() {
-         @Override
          public void run() {
             // Send the message using the fail over publisher.
             try {
@@ -175,7 +240,7 @@ public class FanoutTransportBrokerTest extends NetworkTestSupport {
       }.start();
 
       // Assert that we block:
-      assertFalse(publishDone.await(3, TimeUnit.SECONDS));
+      Assert.assertFalse(publishDone.await(3, TimeUnit.SECONDS));
 
       // Restart the remote server. State should be re-played and the publish
       // should continue.
@@ -184,26 +249,127 @@ public class FanoutTransportBrokerTest extends NetworkTestSupport {
       LOG.info("Broker Restarted");
 
       // This should reconnect, and resend
-      assertTrue(publishDone.await(20, TimeUnit.SECONDS));
+      Assert.assertTrue(publishDone.await(20, TimeUnit.SECONDS));
 
    }
 
-   @Override
    protected String getLocalURI() {
       return "tcp://localhost:61616";
    }
 
-   @Override
    protected String getRemoteURI() {
       return "tcp://localhost:61617";
    }
 
    protected StubConnection createFanoutConnection() throws Exception {
-      URI fanoutURI = new URI("fanout://(static://(" + connector.getServer().getConnectURI() + "," + "mock://" + remoteConnector.getServer().getConnectURI() + "))?fanOutQueues=true");
+      URI fanoutURI = new URI("fanout://(static://(" + newURI(0) + "," + "mock://" + newURI(1) + "))?fanOutQueues=true");
       Transport transport = TransportFactory.connect(fanoutURI);
       StubConnection connection = new StubConnection(transport);
       connections.add(connection);
       return connection;
    }
 
+
+   protected StubConnection createConnection() throws Exception {
+      Transport transport = TransportFactory.connect(new URI(newURI(0)));
+      StubConnection connection = new StubConnection(transport);
+      connections.add(connection);
+      return connection;
+   }
+
+   protected StubConnection createRemoteConnection() throws Exception {
+      Transport transport = TransportFactory.connect(new URI(newURI(1)));
+      StubConnection connection = new StubConnection(transport);
+      connections.add(connection);
+      return connection;
+   }
+
+   protected ConnectionInfo createConnectionInfo() throws Exception {
+      ConnectionInfo info = new ConnectionInfo();
+      info.setConnectionId(new ConnectionId("connection:" + (++idGenerator)));
+      info.setClientId(info.getConnectionId().getValue());
+      return info;
+   }
+
+   protected SessionInfo createSessionInfo(ConnectionInfo connectionInfo) throws Exception {
+      SessionInfo info = new SessionInfo(connectionInfo, ++idGenerator);
+      return info;
+   }
+
+   protected ConsumerInfo createConsumerInfo(SessionInfo sessionInfo,
+                                             ActiveMQDestination destination) throws Exception {
+      ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator);
+      info.setBrowser(false);
+      info.setDestination(destination);
+      info.setPrefetchSize(1000);
+      info.setDispatchAsync(false);
+      return info;
+   }
+
+   protected ProducerInfo createProducerInfo(SessionInfo sessionInfo) throws Exception {
+      ProducerInfo info = new ProducerInfo(sessionInfo, ++idGenerator);
+      return info;
+   }
+
+   protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination, int deliveryMode) {
+      Message message = createMessage(producerInfo, destination);
+      message.setPersistent(deliveryMode == DeliveryMode.PERSISTENT);
+      return message;
+   }
+
+   protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination) {
+      ActiveMQTextMessage message = new ActiveMQTextMessage();
+      message.setMessageId(new MessageId(producerInfo, ++msgIdGenerator));
+      message.setDestination(destination);
+      message.setPersistent(false);
+      try {
+         message.setText("Test Message Payload.");
+      }
+      catch (MessageNotWriteableException e) {
+      }
+      return message;
+   }
+
+   public Message receiveMessage(StubConnection connection) throws InterruptedException {
+      return receiveMessage(connection, maxWait);
+   }
+
+   public Message receiveMessage(StubConnection connection, long timeout) throws InterruptedException {
+      while (true) {
+         Object o = connection.getDispatchQueue().poll(timeout, TimeUnit.MILLISECONDS);
+
+         if (o == null) {
+            return null;
+         }
+         if (o instanceof MessageDispatch) {
+
+            MessageDispatch dispatch = (MessageDispatch) o;
+            if (dispatch.getMessage() == null) {
+               return null;
+            }
+            dispatch.setMessage(dispatch.getMessage().copy());
+            dispatch.getMessage().setRedeliveryCounter(dispatch.getRedeliveryCounter());
+            return dispatch.getMessage();
+         }
+      }
+   }
+
+   protected void assertNoMessagesLeft(StubConnection connection) throws InterruptedException {
+      long wait = FAST_NO_MESSAGE_LEFT_ASSERT ? 0 : maxWait;
+      while (true) {
+         Object o = connection.getDispatchQueue().poll(wait, TimeUnit.MILLISECONDS);
+         if (o == null) {
+            return;
+         }
+         if (o instanceof MessageDispatch && ((MessageDispatch) o).getMessage() != null) {
+            Assert.fail("Received a message: " + ((MessageDispatch) o).getMessage().getMessageId());
+         }
+      }
+   }
+   protected void restartRemoteBroker() throws Exception {
+      remoteServer.stop();
+      Thread.sleep(2000);
+      remoteServer.start();
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
index 619190f..01f6963 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
@@ -173,11 +173,8 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
    }
 
    public void testClientHang() throws Exception {
-
-      //
       // Manually create a client transport so that it does not send KeepAlive
-      // packets.
-      // this should simulate a client hang.
+      // packets.  this should simulate a client hang.
       clientTransport = new TcpTransport(new OpenWireFormat(), SocketFactory.getDefault(), new URI("tcp://localhost:" + serverPort), null);
       clientTransport.setTransportListener(new TransportListener() {
          @Override
@@ -205,9 +202,10 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
          public void transportResumed() {
          }
       });
+
       clientTransport.start();
       WireFormatInfo info = new WireFormatInfo();
-      info.setVersion(OpenWireFormat.DEFAULT_VERSION);
+      info.setVersion(OpenWireFormat.DEFAULT_LEGACY_VERSION);
       info.setMaxInactivityDuration(1000);
       clientTransport.oneway(info);
 
@@ -242,19 +240,17 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
     * @throws URISyntaxException
     */
    public void initCombosForTestNoClientHangWithServerBlock() throws Exception {
-
       startClient();
 
-      addCombinationValues("clientInactivityLimit", new Object[]{Long.valueOf(1000)});
-      addCombinationValues("serverInactivityLimit", new Object[]{Long.valueOf(1000)});
-      addCombinationValues("serverRunOnCommand", new Object[]{new Runnable() {
+      addCombinationValues("clientInactivityLimit", new Object[] {Long.valueOf(1000)});
+      addCombinationValues("serverInactivityLimit", new Object[] {Long.valueOf(1000)});
+      addCombinationValues("serverRunOnCommand", new Object[] {new Runnable() {
          @Override
          public void run() {
             try {
                LOG.info("Sleeping");
                Thread.sleep(4000);
-            }
-            catch (InterruptedException e) {
+            } catch (InterruptedException e) {
             }
          }
       }});
@@ -272,5 +268,4 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
       assertEquals(0, clientErrorCount.get());
       assertEquals(0, serverErrorCount.get());
    }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/SslBrokerServiceTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/SslBrokerServiceTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/SslBrokerServiceTest.java
index 3a1c9a4..d380246 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/SslBrokerServiceTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/SslBrokerServiceTest.java
@@ -124,6 +124,7 @@ public class SslBrokerServiceTest extends TransportBrokerTestSupport {
    private void makeSSLConnection(SSLContext context,
                                   String enabledSuites[],
                                   TransportConnector connector) throws Exception, UnknownHostException, SocketException {
+      System.out.println("-----connector: " + connector);
       SSLSocket sslSocket = (SSLSocket) context.getSocketFactory().createSocket("localhost", connector.getUri().getPort());
 
       if (enabledSuites != null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TcpTransportBindTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TcpTransportBindTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TcpTransportBindTest.java
index 1c34d83..38188ce 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TcpTransportBindTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TcpTransportBindTest.java
@@ -37,10 +37,11 @@ public class TcpTransportBindTest extends EmbeddedBrokerTestSupport {
     */
    @Override
    protected void setUp() throws Exception {
+      disableWrapper = true;
       bindAddress = addr + "?transport.reuseAddress=true&transport.soTimeout=1000";
       super.setUp();
 
-      addr = broker.getTransportConnectors().get(0).getPublishableConnectString();
+      addr = newURI("localhost", 0);
    }
 
    public void testConnect() throws Exception {
@@ -58,7 +59,7 @@ public class TcpTransportBindTest extends EmbeddedBrokerTestSupport {
          @Override
          public void run() {
             try {
-               broker.stop();
+               artemisBroker.stop();
             }
             catch (Exception e) {
                e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java
index 9ae82ac..ce9aff9 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java
@@ -23,7 +23,6 @@ import junit.framework.Test;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.broker.BrokerService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -166,6 +165,7 @@ public class TransportUriTest extends EmbeddedBrokerTestSupport {
 
    @Override
    protected void setUp() throws Exception {
+      disableWrapper = true;
       bindAddress = "tcp://localhost:61616";
       super.setUp();
    }
@@ -183,15 +183,6 @@ public class TransportUriTest extends EmbeddedBrokerTestSupport {
       super.tearDown();
    }
 
-   @Override
-   protected BrokerService createBroker() throws Exception {
-      BrokerService answer = new BrokerService();
-      answer.setUseJmx(false);
-      answer.setPersistent(isPersistent());
-      answer.addConnector(bindAddress);
-      return answer;
-   }
-
    public static Test suite() {
       return suite(TransportUriTest.class);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java
deleted file mode 100644
index 3791848..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.vm;
-
-import java.net.URI;
-
-import javax.jms.Connection;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerRegistry;
-
-public class VMTransportBrokerNameTest extends TestCase {
-
-   private static final String MY_BROKER = "myBroker";
-   final String vmUrl = "vm:(broker:(tcp://localhost:61616)/" + MY_BROKER + "?persistent=false)";
-
-   public void testBrokerName() throws Exception {
-      ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(vmUrl));
-      ActiveMQConnection c1 = (ActiveMQConnection) cf.createConnection();
-      assertTrue("Transport has name in it: " + c1.getTransport(), c1.getTransport().toString().contains(MY_BROKER));
-
-      // verify Broker is there with name
-      ActiveMQConnectionFactory cfbyName = new ActiveMQConnectionFactory(new URI("vm://" + MY_BROKER + "?create=false"));
-      Connection c2 = cfbyName.createConnection();
-
-      assertNotNull(BrokerRegistry.getInstance().lookup(MY_BROKER));
-      assertEquals(BrokerRegistry.getInstance().findFirst().getBrokerName(), MY_BROKER);
-      assertEquals(BrokerRegistry.getInstance().getBrokers().size(), 1);
-
-      c1.close();
-      c2.close();
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerTest.java
deleted file mode 100644
index 52e4b88..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.vm;
-
-import junit.framework.Test;
-
-import org.apache.activemq.transport.TransportBrokerTestSupport;
-
-public class VMTransportBrokerTest extends TransportBrokerTestSupport {
-
-   @Override
-   protected String getBindLocation() {
-      return "vm://localhost";
-   }
-
-   public static Test suite() {
-      return suite(VMTransportBrokerTest.class);
-   }
-
-   public static void main(String[] args) {
-      junit.textui.TestRunner.run(suite());
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportEmbeddedBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportEmbeddedBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportEmbeddedBrokerTest.java
deleted file mode 100644
index dbc7f29..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportEmbeddedBrokerTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.vm;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-
-import javax.jms.DeliveryMode;
-
-import org.apache.activemq.broker.BrokerRegistry;
-import org.apache.activemq.broker.BrokerTestSupport;
-import org.apache.activemq.broker.StubConnection;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportFactory;
-import org.apache.activemq.util.IOExceptionSupport;
-
-/**
- * Used to see if the VM transport starts an embedded broker on demand.
- */
-public class VMTransportEmbeddedBrokerTest extends BrokerTestSupport {
-
-   public static void main(String[] args) {
-      junit.textui.TestRunner.run(VMTransportEmbeddedBrokerTest.class);
-   }
-
-   public void testConsumerPrefetchAtOne() throws Exception {
-
-      // Make sure the broker is created due to the connection being started.
-      assertNull(BrokerRegistry.getInstance().lookup("localhost"));
-      StubConnection connection = createConnection();
-      assertNotNull(BrokerRegistry.getInstance().lookup("localhost"));
-
-      // Start a producer and consumer
-      ConnectionInfo connectionInfo = createConnectionInfo();
-      SessionInfo sessionInfo = createSessionInfo(connectionInfo);
-      ProducerInfo producerInfo = createProducerInfo(sessionInfo);
-      connection.send(connectionInfo);
-      connection.send(sessionInfo);
-      connection.send(producerInfo);
-
-      ActiveMQQueue destination = new ActiveMQQueue("TEST");
-
-      ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
-      consumerInfo.setPrefetchSize(1);
-      connection.send(consumerInfo);
-
-      // Send 2 messages to the broker.
-      connection.send(createMessage(producerInfo, destination, DeliveryMode.NON_PERSISTENT));
-      connection.send(createMessage(producerInfo, destination, DeliveryMode.NON_PERSISTENT));
-
-      // Make sure only 1 message was delivered.
-      Message m = receiveMessage(connection);
-      assertNotNull(m);
-      assertNoMessagesLeft(connection);
-
-      // Make sure the broker is shutdown when the connection is stopped.
-      assertNotNull(BrokerRegistry.getInstance().lookup("localhost"));
-      connection.stop();
-      assertNull(BrokerRegistry.getInstance().lookup("localhost"));
-   }
-
-   @Override
-   protected void setUp() throws Exception {
-      // Don't call super since it manually starts up a broker.
-   }
-
-   @Override
-   protected void tearDown() throws Exception {
-      // Don't call super since it manually tears down a broker.
-   }
-
-   @Override
-   protected StubConnection createConnection() throws Exception {
-      try {
-         Transport transport = TransportFactory.connect(new URI("vm://localhost?broker.persistent=false"));
-         StubConnection connection = new StubConnection(transport);
-         return connection;
-      }
-      catch (URISyntaxException e) {
-         throw IOExceptionSupport.create(e);
-      }
-   }
-
-}