You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/19 06:08:36 UTC

[30/67] [abbrv] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/14d4d0c8/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java
index ffdfc6e..d34f943 100644
--- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -16,22 +16,25 @@
  */
 package org.apache.activemq.broker;
 
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.net.ServerSocket;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.WeakHashMap;
-
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.ActiveMQConnectionMetaData;
 import org.apache.activemq.Service;
@@ -44,6 +47,8 @@ import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.broker.scheduler.JobSchedulerStore;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
+import org.apache.activemq.network.ConnectionFilter;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
 import org.apache.activemq.network.NetworkConnector;
 import org.apache.activemq.network.jms.JmsConnector;
 import org.apache.activemq.proxy.ProxyConnector;
@@ -57,6 +62,7 @@ import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.IOExceptionHandler;
 import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.util.ServiceStopper;
+import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,10 +74,12 @@ import org.slf4j.LoggerFactory;
 public class BrokerService implements Service {
 
    public static final String DEFAULT_PORT = "61616";
+   public static final AtomicInteger RANDOM_PORT_BASE = new AtomicInteger(51616);
    public static final String DEFAULT_BROKER_NAME = "localhost";
    public static final String BROKER_VERSION;
    public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
    public static final long DEFAULT_START_TIMEOUT = 600000L;
+   public static boolean disableWrapper = false;
 
    public String SERVER_SIDE_KEYSTORE;
    public String KEYSTORE_PASSWORD;
@@ -99,6 +107,11 @@ public class BrokerService implements Service {
    private PolicyMap destinationPolicy;
    private SystemUsage systemUsage;
 
+   private boolean isClustered = true;
+   private final List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>();
+
+   private TemporaryFolder tmpfolder;
+
    public static WeakHashMap<Broker, Exception> map = new WeakHashMap<>();
 
    static {
@@ -131,6 +144,10 @@ public class BrokerService implements Service {
 
    @Override
    public void start() throws Exception {
+      File targetTmp = new File("./target/tmp");
+      targetTmp.mkdirs();
+      tmpfolder = new TemporaryFolder(targetTmp);
+      tmpfolder.create();
       Exception e = new Exception();
       e.fillInStackTrace();
       startBroker(startAsync);
@@ -188,10 +205,10 @@ public class BrokerService implements Service {
       LOG.info("Apache ActiveMQ Artemis{} ({}, {}) is shutting down", new Object[]{getBrokerVersion(), getBrokerName(), brokerId});
 
       if (broker != null) {
-         System.out.println("______________________stopping broker: " + broker.getClass().getName());
          broker.stop();
          broker = null;
       }
+      tmpfolder.delete();
       LOG.info("Apache ActiveMQ Artemis {} ({}, {}) is shutdown", new Object[]{getBrokerVersion(), getBrokerName(), brokerId});
    }
 
@@ -200,7 +217,7 @@ public class BrokerService implements Service {
 
    public Broker getBroker() throws Exception {
       if (broker == null) {
-         broker = createBroker();
+         broker = createBroker(tmpfolder.getRoot());
       }
       return broker;
    }
@@ -220,13 +237,14 @@ public class BrokerService implements Service {
       this.brokerName = str.trim();
    }
 
-   protected Broker createBroker() throws Exception {
-      broker = createBrokerWrapper();
+   protected Broker createBroker(File temporaryFile) throws Exception {
+      new Exception("file=" + temporaryFile.getAbsolutePath()).printStackTrace();
+      broker = createBrokerWrapper(temporaryFile);
       return broker;
    }
 
-   private Broker createBrokerWrapper() {
-      return new ArtemisBrokerWrapper(this);
+   private Broker createBrokerWrapper(File temporaryFile) {
+      return new ArtemisBrokerWrapper(this, temporaryFile);
    }
 
    public void makeSureDestinationExists(ActiveMQDestination activemqDestination) throws Exception {
@@ -382,10 +400,6 @@ public class BrokerService implements Service {
    public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
    }
 
-   public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception {
-      return null;
-   }
-
    public TransportConnector getConnectorByName(String connectorName) {
       return null;
    }
@@ -407,8 +421,17 @@ public class BrokerService implements Service {
    public void setSchedulerDirectoryFile(File schedulerDirectory) {
    }
 
+   public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception {
+      return addNetworkConnector(new URI(discoveryAddress));
+   }
+
+   public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception {
+      NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress);
+      return addNetworkConnector(connector);
+   }
+
    public List<NetworkConnector> getNetworkConnectors() {
-      return new ArrayList<>();
+      return this.networkConnectors;
    }
 
    public void setSchedulerSupport(boolean schedulerSupport) {
@@ -471,6 +494,30 @@ public class BrokerService implements Service {
    }
 
    public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception {
+      connector.setBrokerService(this);
+
+      System.out.println("------------------------ this broker uri: " + this.getConnectURI());
+      connector.setLocalUri(this.getConnectURI());
+      // Set a connection filter so that the connector does not establish loop
+      // back connections.
+      connector.setConnectionFilter(new ConnectionFilter() {
+         @Override
+         public boolean connectTo(URI location) {
+            List<TransportConnector> transportConnectors = getTransportConnectors();
+            for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
+               try {
+                  TransportConnector tc = iter.next();
+                  if (location.equals(tc.getConnectUri())) {
+                     return false;
+                  }
+               } catch (Throwable e) {
+               }
+            }
+            return true;
+         }
+      });
+
+      networkConnectors.add(connector);
       return connector;
    }
 
@@ -486,19 +533,63 @@ public class BrokerService implements Service {
 
    public TransportConnector addConnector(URI bindAddress) throws Exception {
       Integer port = bindAddress.getPort();
+      String host = bindAddress.getHost();
       FakeTransportConnector connector = null;
-      if (port != 0) {
-         connector = new FakeTransportConnector(bindAddress);
-         this.transportConnectors.add(connector);
-         this.extraConnectors.add(port);
+
+      host = (host == null || host.length() == 0) ? "localhost" : host;
+      if ("0.0.0.0".equals(host)) {
+         host = "localhost";
       }
-      else {
-         connector = new FakeTransportConnector(new URI(this.getDefaultUri()));
-         this.transportConnectors.add(connector);
+
+      if (port == 0) {
+         //In actual impl in amq5, after connector has been added the socket
+         //is bound already. This means in case of 0 port uri, the random
+         //port is available after this call. With artemis wrapper however
+         //the real binding happens during broker start. To work around this
+         //we use manually calculated port for that.
+         port = getPseudoRandomPort();
+
       }
+
+      System.out.println("Now host is: " + host);
+      bindAddress = new URI(bindAddress.getScheme(), bindAddress.getUserInfo(),
+              host, port, bindAddress.getPath(), bindAddress.getQuery(), bindAddress.getFragment());
+
+      connector = new FakeTransportConnector(bindAddress);
+      this.transportConnectors.add(connector);
+      this.extraConnectors.add(port);
+
       return connector;
    }
 
+   private int getPseudoRandomPort() {
+      int port = RANDOM_PORT_BASE.getAndIncrement();
+      while (!checkPort(port)) {
+         port = RANDOM_PORT_BASE.getAndIncrement();
+      }
+      return port;
+   }
+
+   private static boolean checkPort(final int port) {
+      ServerSocket ssocket = null;
+      try {
+         ssocket = new ServerSocket(port);
+      }
+      catch (Exception e) {
+         return false;
+      }
+      finally {
+         if (ssocket != null) {
+            try {
+               ssocket.close();
+            }
+            catch (IOException e) {
+            }
+         }
+      }
+      return true;
+   }
+
    public void setCacheTempDestinations(boolean cacheTempDestinations) {
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/14d4d0c8/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java
index 5c052a6..fb3c242 100644
--- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java
@@ -17,7 +17,6 @@
 package org.apache.activemq.broker.artemiswrapper;
 
 import java.io.File;
-import java.io.IOException;
 import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
@@ -65,7 +64,6 @@ import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.store.PListStore;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.usage.Usage;
-import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -83,20 +81,19 @@ public abstract class ArtemisBrokerBase implements Broker {
    protected volatile boolean stopped;
    protected BrokerId brokerId = new BrokerId("Artemis Broker");
    protected BrokerService bservice;
-   protected TemporaryFolder temporaryFolder = new TemporaryFolder();
-   protected String testDir;
+
+   protected final File temporaryFolder;
+   protected final String testDir;
    protected boolean realStore = false;
 
    protected ActiveMQServer server;
 
    protected boolean enableSecurity = false;
 
-   public ArtemisBrokerBase() {
-      try {
-         this.temporaryFolder.create();
-      }
-      catch (IOException e) {
-      }
+   public ArtemisBrokerBase(File temporaryFolder) {
+      this.temporaryFolder = temporaryFolder;
+      this.testDir = temporaryFolder.getAbsolutePath();
+
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/14d4d0c8/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
index 61d6250..3ad6072 100644
--- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker.artemiswrapper;
 
+import java.io.File;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -46,20 +47,16 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
    protected final Map<String, SimpleString> testQueues = new HashMap<>();
    protected JMSServerManagerImpl jmsServer;
 
-   public ArtemisBrokerWrapper(BrokerService brokerService) {
+   public ArtemisBrokerWrapper(BrokerService brokerService, File temporaryFolder) {
+      super(temporaryFolder);
       this.bservice = brokerService;
    }
 
    @Override
    public void start() throws Exception {
-      testDir = temporaryFolder.getRoot().getAbsolutePath();
       clearDataRecreateServerDirs();
       server = createServer(realStore, true);
       server.getConfiguration().getAcceptorConfigurations().clear();
-      HashMap<String, Object> params = new HashMap<>();
-      params.put(TransportConstants.PORT_PROP_NAME, "61616");
-      params.put(TransportConstants.PROTOCOLS_PROP_NAME, "OPENWIRE,CORE");
-      TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
 
       Configuration serverConfig = server.getConfiguration();
 
@@ -82,9 +79,11 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
       commonSettings.setDeadLetterAddress(dla);
       commonSettings.setAutoCreateJmsQueues(true);
 
-      serverConfig.getAcceptorConfigurations().add(transportConfiguration);
+      HashMap<String, Object> params = new HashMap<String, Object>();
+      if (bservice.extraConnectors.size() == 0) {
+         serverConfig.addAcceptorConfiguration("home", "tcp://localhost:61616?protocols=OPENWIRE,CORE");
+      }
       if (this.bservice.enableSsl()) {
-         params = new HashMap<>();
          params.put(TransportConstants.SSL_ENABLED_PROP_NAME, true);
          params.put(TransportConstants.PORT_PROP_NAME, 61611);
          params.put(TransportConstants.PROTOCOLS_PROP_NAME, "OPENWIRE");
@@ -102,14 +101,7 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
       }
 
       for (Integer port : bservice.extraConnectors) {
-         if (port.intValue() != 61616) {
-            //extra port
-            params = new HashMap<>();
-            params.put(TransportConstants.PORT_PROP_NAME, port.intValue());
-            params.put(TransportConstants.PROTOCOLS_PROP_NAME, "OPENWIRE");
-            TransportConfiguration extraTransportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
-            serverConfig.getAcceptorConfigurations().add(extraTransportConfiguration);
-         }
+         serverConfig.addAcceptorConfiguration("homePort" + port, "tcp://localhost:" + port + "?protocols=OPENWIRE,CORE");
       }
 
       serverConfig.setSecurityEnabled(enableSecurity);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/14d4d0c8/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java
new file mode 100644
index 0000000..be9cf06
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.artemiswrapper;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
+import org.apache.activemq.artemis.api.jms.management.JMSQueueControl;
+import org.apache.activemq.artemis.api.jms.management.JMSServerControl;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.server.JournalType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.BrokerService;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+
+import javax.management.MBeanServer;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.ObjectName;
+
+public class OpenwireArtemisBaseTest {
+
+   @Rule
+   public TemporaryFolder temporaryFolder;
+   @Rule
+   public TestName name = new TestName();
+
+   public OpenwireArtemisBaseTest() {
+      File tmpRoot = new File("./target/tmp");
+      tmpRoot.mkdirs();
+      temporaryFolder = new TemporaryFolder(tmpRoot);
+      //The wrapper stuff will automatically create a default
+      //server on a normal connection factory, which will
+      //cause problems with clustering tests, which starts
+      //all servers explicitly. Setting this to true
+      //can prevent the auto-creation from happening.
+      BrokerService.disableWrapper = true;
+   }
+
+
+   public String getTmp() {
+      return getTmpFile().getAbsolutePath();
+   }
+
+   public File getTmpFile() {
+      return temporaryFolder.getRoot();
+   }
+
+   protected String getJournalDir(int serverID, boolean backup) {
+      return getTmp() + "/journal_" + serverID + "_" + backup;
+   }
+
+   protected String getBindingsDir(int serverID, boolean backup) {
+      return getTmp() + "/binding_" + serverID + "_" + backup;
+   }
+
+   protected String getPageDir(int serverID, boolean backup) {
+      return getTmp() + "/paging_" + serverID + "_" + backup;
+   }
+
+   protected String getLargeMessagesDir(int serverID, boolean backup) {
+      return getTmp() + "/paging_" + serverID + "_" + backup;
+   }
+
+   public String CLUSTER_PASSWORD = "OPENWIRECLUSTER";
+
+   protected Configuration createConfig(final int serverID) throws Exception {
+      return createConfig("localhost", serverID);
+   }
+
+   protected Configuration createConfig(final String hostAddress, final int serverID, final int port) throws Exception {
+      ConfigurationImpl configuration = new ConfigurationImpl().setJMXManagementEnabled(false).
+         setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(1000 * 1024).setJournalType(JournalType.NIO).
+         setJournalDirectory(getJournalDir(serverID, false)).
+         setBindingsDirectory(getBindingsDir(serverID, false)).
+         setPagingDirectory(getPageDir(serverID, false)).
+         setLargeMessagesDirectory(getLargeMessagesDir(serverID, false)).
+         setJournalCompactMinFiles(0).
+         setJournalCompactPercentage(0).
+         setClusterPassword(CLUSTER_PASSWORD);
+
+      configuration.addAddressesSetting("#", new AddressSettings().setAutoCreateJmsQueues(true).setAutoDeleteJmsQueues(true));
+
+      configuration.addAcceptorConfiguration("netty", newURIwithPort(hostAddress, port));
+      configuration.addConnectorConfiguration("netty-connector", newURIwithPort(hostAddress, port));
+
+      return configuration;
+   }
+
+   protected Configuration createConfig(final String hostAddress, final int serverID) throws Exception {
+      ConfigurationImpl configuration = new ConfigurationImpl().setJMXManagementEnabled(false).
+              setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(1000 * 1024).setJournalType(JournalType.NIO).
+              setJournalDirectory(getJournalDir(serverID, false)).
+              setBindingsDirectory(getBindingsDir(serverID, false)).
+              setPagingDirectory(getPageDir(serverID, false)).
+              setLargeMessagesDirectory(getLargeMessagesDir(serverID, false)).
+              setJournalCompactMinFiles(0).
+              setJournalCompactPercentage(0).
+              setClusterPassword(CLUSTER_PASSWORD);
+
+      configuration.addAddressesSetting("#", new AddressSettings().setAutoCreateJmsQueues(true).setAutoDeleteJmsQueues(true));
+
+      configuration.addAcceptorConfiguration("netty", newURI(hostAddress, serverID));
+      configuration.addConnectorConfiguration("netty-connector", newURI(hostAddress, serverID));
+
+      return configuration;
+   }
+
+   //extraAcceptor takes form: "?name=value&name1=value ..."
+   protected Configuration createConfig(final int serverID, String extraAcceptorParams) throws Exception {
+      ConfigurationImpl configuration = new ConfigurationImpl().setJMXManagementEnabled(false).
+              setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(100 * 1024).setJournalType(JournalType.NIO).
+              setJournalDirectory(getJournalDir(serverID, false)).
+              setBindingsDirectory(getBindingsDir(serverID, false)).
+              setPagingDirectory(getPageDir(serverID, false)).
+              setLargeMessagesDirectory(getLargeMessagesDir(serverID, false)).
+              setJournalCompactMinFiles(0).
+              setJournalCompactPercentage(0).
+              setClusterPassword(CLUSTER_PASSWORD);
+
+      configuration.addAddressesSetting("#", new AddressSettings().setAutoCreateJmsQueues(true).setAutoDeleteJmsQueues(true));
+
+      String fullAcceptorUri = newURI(serverID) + extraAcceptorParams;
+      configuration.addAcceptorConfiguration("netty", fullAcceptorUri);
+
+      configuration.addConnectorConfiguration("netty-connector", newURI(serverID));
+      return configuration;
+   }
+
+   public void deployClusterConfiguration(Configuration config, Integer ... targetIDs) throws Exception {
+      StringBuffer stringBuffer = new StringBuffer();
+      String separator = "";
+      for (int x : targetIDs) {
+         stringBuffer.append(separator + newURI(x));
+         separator = ",";
+      }
+
+      String ccURI = "static://(" + stringBuffer.toString() + ")?connectorName=netty-connector;retryInterval=500;messageLoadBalancingType=STRICT;maxHops=1";
+
+      config.addClusterConfiguration("clusterCC", ccURI);
+   }
+
+   protected static String newURI(int serverID) {
+      return newURI("localhost", serverID);
+   }
+
+   protected static String newURI(String localhostAddress, int serverID) {
+      return "tcp://" + localhostAddress + ":" + (61616 + serverID);
+   }
+
+   protected static String newURIwithPort(String localhostAddress, int port) {
+      return "tcp://" + localhostAddress + ":" + port;
+   }
+
+   public static JMSServerControl createJMSServerControl(final MBeanServer mbeanServer) throws Exception {
+      return (JMSServerControl) createProxy(ObjectNameBuilder.DEFAULT.getJMSServerObjectName(), JMSServerControl.class, mbeanServer);
+   }
+
+   public static JMSQueueControl createJMSQueueControl(final String name,
+                                                       final MBeanServer mbeanServer) throws Exception {
+      return (JMSQueueControl) createProxy(ObjectNameBuilder.DEFAULT.getJMSQueueObjectName(name), JMSQueueControl.class, mbeanServer);
+   }
+
+   private static Object createProxy(final ObjectName objectName,
+                                     final Class mbeanInterface,
+                                     final MBeanServer mbeanServer) {
+      return MBeanServerInvocationHandler.newProxyInstance(mbeanServer, objectName, mbeanInterface, false);
+   }
+
+   protected void shutDownClusterServers(EmbeddedJMS[] servers) throws Exception {
+      for (int i = 0; i < servers.length; i++) {
+         try {
+            servers[i].stop();
+         }
+         catch (Throwable t) {
+            t.printStackTrace();
+         }
+      }
+   }
+
+   protected void shutDownNonClusterServers(EmbeddedJMS[] servers) throws Exception {
+      shutDownClusterServers(servers);
+   }
+
+   protected void setUpNonClusterServers(EmbeddedJMS[] servers) throws Exception {
+
+      Configuration[] serverCfgs = new Configuration[servers.length];
+      for (int i = 0; i < servers.length; i++) {
+         serverCfgs[i] = createConfig(i);
+      }
+
+      for (int i = 0; i < servers.length; i++) {
+         servers[i] = new EmbeddedJMS().setConfiguration(serverCfgs[i]).setJmsConfiguration(new JMSConfigurationImpl());
+      }
+
+      for (int i = 0; i < servers.length; i++) {
+         servers[i].start();
+      }
+   }
+
+   protected void setUpClusterServers(EmbeddedJMS[] servers) throws Exception {
+
+      Configuration[] serverCfgs = new Configuration[servers.length];
+      for (int i = 0; i < servers.length; i++) {
+         serverCfgs[i] = createConfig(i);
+      }
+
+      for (int i = 0; i < servers.length; i++) {
+         deployClusterConfiguration(serverCfgs[i], getTargets(servers.length, i));
+      }
+
+      for (int i = 0; i < servers.length; i++) {
+         servers[i] = new EmbeddedJMS().setConfiguration(serverCfgs[i]).setJmsConfiguration(new JMSConfigurationImpl());
+      }
+
+      for (int i = 0; i < servers.length; i++) {
+         servers[i].start();
+      }
+
+      for (int i = 0; i < servers.length; i++) {
+         Assert.assertTrue(servers[i].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, servers.length));
+      }
+   }
+
+   private Integer[] getTargets(int total, int self)
+   {
+      int lenTargets = total - self;
+      List<Integer> targets = new ArrayList<>();
+      for (int i = 0; i < lenTargets; i++) {
+         if (i != self) {
+            targets.add(i);
+         }
+      }
+      return targets.toArray(new Integer[0]);
+   }
+
+   public EmbeddedJMS createBroker() throws Exception {
+      Configuration config0 = createConfig(0);
+      EmbeddedJMS newbroker = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+      return newbroker;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/14d4d0c8/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
index 34babf8..0843d3a 100644
--- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -29,6 +30,7 @@ import javax.net.SocketFactory;
 import org.apache.activemq.TransportLoggerSupport;
 import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper;
 import org.apache.activemq.broker.BrokerRegistry;
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.transport.*;
 import org.apache.activemq.util.IOExceptionSupport;
@@ -54,11 +56,10 @@ public class TcpTransportFactory extends TransportFactory {
       //here check broker, if no broker, we start one
       Map<String, String> params = URISupport.parseParameters(location);
       String brokerId = params.remove("invmBrokerId");
-      params.clear();
-      location = URISupport.createRemainingURI(location, params);
-      if (brokerService == null) {
+      URI location1 = URISupport.createRemainingURI(location, Collections.EMPTY_MAP);
+      if (brokerService == null && !BrokerService.disableWrapper) {
 
-         ArtemisBrokerHelper.startArtemisBroker(location);
+         ArtemisBrokerHelper.startArtemisBroker(location1);
          brokerService = location.toString();
 
          if (brokerId != null) {
@@ -66,7 +67,8 @@ public class TcpTransportFactory extends TransportFactory {
             System.out.println("bound: " + brokerId);
          }
       }
-      return super.doConnect(location);
+      URI location2 = URISupport.createRemainingURI(location, params);
+      return super.doConnect(location2);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/14d4d0c8/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java
deleted file mode 100644
index fd06de9..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Deprecated
-public class ActiveMQInputStreamTest extends TestCase {
-
-   private static final Logger LOG = LoggerFactory.getLogger(ActiveMQInputStreamTest.class);
-
-   private static final String BROKER_URL = "tcp://localhost:0";
-   private static final String DESTINATION = "destination";
-   private static final int STREAM_LENGTH = 64 * 1024 + 0; // change 0 to 1 to make it not crash
-
-   private BrokerService broker;
-   private String connectionUri;
-
-   @Override
-   public void setUp() throws Exception {
-      broker = new BrokerService();
-      broker.setUseJmx(false);
-      broker.setPersistent(false);
-      broker.setDestinations(new ActiveMQDestination[]{ActiveMQDestination.createDestination(DESTINATION, ActiveMQDestination.QUEUE_TYPE),});
-      broker.addConnector(BROKER_URL);
-      broker.start();
-      broker.waitUntilStarted();
-
-      //some internal api we don't implement
-      connectionUri = broker.getDefaultUri();
-   }
-
-   @Override
-   public void tearDown() throws Exception {
-      broker.stop();
-      broker.waitUntilStopped();
-   }
-
-   public void testInputStreamSetSyncSendOption() throws Exception {
-
-      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri);
-      ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      Queue destination = session.createQueue(DESTINATION + "?producer.alwaysSyncSend=true");
-
-      OutputStream out = null;
-      try {
-         out = connection.createOutputStream(destination);
-
-         assertTrue(((ActiveMQOutputStream) out).isAlwaysSyncSend());
-
-         LOG.debug("writing...");
-         for (int i = 0; i < STREAM_LENGTH; ++i) {
-            out.write(0);
-         }
-         LOG.debug("wrote " + STREAM_LENGTH + " bytes");
-      }
-      finally {
-         if (out != null) {
-            out.close();
-         }
-      }
-
-      InputStream in = null;
-      try {
-         in = connection.createInputStream(destination);
-         LOG.debug("reading...");
-         int count = 0;
-         while (-1 != in.read()) {
-            ++count;
-         }
-         LOG.debug("read " + count + " bytes");
-      }
-      finally {
-         if (in != null) {
-            in.close();
-         }
-      }
-
-      connection.close();
-   }
-
-   public void testInputStreamMatchesDefaultChuckSize() throws Exception {
-
-      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri);
-      ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      Queue destination = session.createQueue(DESTINATION);
-
-      OutputStream out = null;
-      try {
-         out = connection.createOutputStream(destination);
-         LOG.debug("writing...");
-         for (int i = 0; i < STREAM_LENGTH; ++i) {
-            out.write(0);
-         }
-         LOG.debug("wrote " + STREAM_LENGTH + " bytes");
-      }
-      finally {
-         if (out != null) {
-            out.close();
-         }
-      }
-
-      InputStream in = null;
-      try {
-         in = connection.createInputStream(destination);
-         LOG.debug("reading...");
-         int count = 0;
-         while (-1 != in.read()) {
-            ++count;
-         }
-         LOG.debug("read " + count + " bytes");
-      }
-      finally {
-         if (in != null) {
-            in.close();
-         }
-      }
-
-      connection.close();
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/14d4d0c8/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/AutoFailTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/AutoFailTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/AutoFailTestSupport.java
new file mode 100644
index 0000000..f47620f
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/AutoFailTestSupport.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import junit.framework.TestCase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Enforces a test case to run for only an allotted time to prevent them from
+ * hanging and breaking the whole testing.
+ *
+ *
+ */
+
+public abstract class AutoFailTestSupport extends TestCase {
+    public static final int EXIT_SUCCESS = 0;
+    public static final int EXIT_ERROR = 1;
+    private static final Logger LOG = LoggerFactory.getLogger(AutoFailTestSupport.class);
+
+    private long maxTestTime = 5 * 60 * 1000; // 5 mins by default
+    private Thread autoFailThread;
+
+    private boolean verbose = true;
+    private boolean useAutoFail; // Disable auto fail by default
+    private AtomicBoolean isTestSuccess;
+
+    protected void setUp() throws Exception {
+        // Runs the auto fail thread before performing any setup
+        if (isAutoFail()) {
+            startAutoFailThread();
+        }
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+
+        // Stops the auto fail thread only after performing any clean up
+        stopAutoFailThread();
+    }
+
+    /**
+     * Manually start the auto fail thread. To start it automatically, just set
+     * the auto fail to true before calling any setup methods. As a rule, this
+     * method is used only when you are not sure, if the setUp and tearDown
+     * method is propagated correctly.
+     */
+    public void startAutoFailThread() {
+        setAutoFail(true);
+        isTestSuccess = new AtomicBoolean(false);
+        autoFailThread = new Thread(new Runnable() {
+            public void run() {
+                try {
+                    // Wait for test to finish succesfully
+                    Thread.sleep(getMaxTestTime());
+                } catch (InterruptedException e) {
+                    // This usually means the test was successful
+                } finally {
+                    // Check if the test was able to tear down succesfully,
+                    // which usually means, it has finished its run.
+                    if (!isTestSuccess.get()) {
+                        LOG.error("Test case has exceeded the maximum allotted time to run of: " + getMaxTestTime() + " ms.");
+                        dumpAllThreads(getName());
+                        if (System.getProperty("org.apache.activemq.AutoFailTestSupport.disableSystemExit") == null) {
+                            System.exit(EXIT_ERROR);
+                        } else {
+                            LOG.error("No system.exit as it kills surefire - forkedProcessTimeoutInSeconds (surefire.timeout) will kick in eventually see pom.xml surefire plugin config");
+                        }
+                    }
+                }
+            }
+        }, "AutoFailThread");
+
+        if (verbose) {
+            LOG.info("Starting auto fail thread...");
+        }
+
+        LOG.info("Starting auto fail thread...");
+        autoFailThread.start();
+    }
+
+    /**
+     * Manually stops the auto fail thread. As a rule, this method is used only
+     * when you are not sure, if the setUp and tearDown method is propagated
+     * correctly.
+     */
+    public void stopAutoFailThread() {
+        if (isAutoFail() && autoFailThread != null && autoFailThread.isAlive()) {
+            isTestSuccess.set(true);
+
+            if (verbose) {
+                LOG.info("Stopping auto fail thread...");
+            }
+
+            LOG.info("Stopping auto fail thread...");
+            autoFailThread.interrupt();
+        }
+    }
+
+    /**
+     * Sets the auto fail value. As a rule, this should be used only before any
+     * setup methods is called to automatically enable the auto fail thread in
+     * the setup method of the test case.
+     *
+     * @param val
+     */
+    public void setAutoFail(boolean val) {
+        this.useAutoFail = val;
+    }
+
+    public boolean isAutoFail() {
+        return this.useAutoFail;
+    }
+
+    /**
+     * The assigned value will only be reflected when the auto fail thread has
+     * started its run. Value is in milliseconds.
+     *
+     * @param val
+     */
+    public void setMaxTestTime(long val) {
+        this.maxTestTime = val;
+    }
+
+    public long getMaxTestTime() {
+        return this.maxTestTime;
+    }
+
+
+    public static void dumpAllThreads(String prefix) {
+        Map<Thread, StackTraceElement[]> stacks = Thread.getAllStackTraces();
+        for (Entry<Thread, StackTraceElement[]> stackEntry : stacks.entrySet()) {
+            System.err.println(prefix + " " + stackEntry.getKey());
+            for(StackTraceElement element : stackEntry.getValue()) {
+                System.err.println("     " + element);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/14d4d0c8/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java
index 5e5b993..b8397e2 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java
@@ -52,22 +52,50 @@ public class ConnectionCleanupTest extends TestCase {
 
       try {
          connection.setClientID("test");
-         // fail("Should have received JMSException");
+         fail("Should have received JMSException");
       }
       catch (JMSException e) {
       }
 
-      connection.cleanup();
+      connection.doCleanup(true);
       connection.setClientID("test");
 
       connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
       try {
          connection.setClientID("test");
-         // fail("Should have received JMSException");
+         fail("Should have received JMSException");
       }
       catch (JMSException e) {
       }
    }
 
+   public void testChangeClientIDDenied() throws JMSException {
+
+      connection.setClientID("test");
+      connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      try {
+         connection.setClientID("test");
+         fail("Should have received JMSException");
+      } catch (JMSException e) {
+      }
+
+      connection.cleanup();
+
+      try {
+         connection.setClientID("test");
+         fail("Should have received JMSException");
+      } catch (JMSException e) {
+      }
+
+      connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      try {
+         connection.setClientID("test");
+         fail("Should have received JMSException");
+      } catch (JMSException e) {
+      }
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/14d4d0c8/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java
index fa58ebe..b8dea70 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java
@@ -16,15 +16,23 @@
  */
 package org.apache.activemq;
 
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.server.JournalType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.junit.rules.TemporaryFolder;
 import org.springframework.jms.core.JmsTemplate;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
+import java.io.File;
 
 /**
  * A useful base class which creates and closes an embedded broker
@@ -32,17 +40,26 @@ import javax.jms.Destination;
 public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport {
 
    protected BrokerService broker;
-   // protected String bindAddress = "tcp://localhost:61616";
-   protected String bindAddress = "vm://localhost";
+   protected EmbeddedJMS artemisBroker;
+   protected String bindAddress = "tcp://localhost:61616";
    protected ConnectionFactory connectionFactory;
    protected boolean useTopic;
    protected ActiveMQDestination destination;
    protected JmsTemplate template;
 
-   @Override
+   public TemporaryFolder temporaryFolder;
+
+   public String CLUSTER_PASSWORD = "OPENWIRECLUSTER";
+
    protected void setUp() throws Exception {
-      if (broker == null) {
-         broker = createBroker();
+      BrokerService.disableWrapper = true;
+      File tmpRoot = new File("./target/tmp");
+      tmpRoot.mkdirs();
+      temporaryFolder = new TemporaryFolder(tmpRoot);
+      temporaryFolder.create();
+
+      if (artemisBroker == null) {
+         artemisBroker = createArtemisBroker();
       }
       startBroker();
 
@@ -58,13 +75,42 @@ public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport {
 
    @Override
    protected void tearDown() throws Exception {
-      if (broker != null) {
+      if (artemisBroker != null) {
          try {
-            broker.stop();
+            artemisBroker.stop();
          }
          catch (Exception e) {
          }
       }
+      temporaryFolder.delete();
+   }
+
+   public String getTmp() {
+      return getTmpFile().getAbsolutePath();
+   }
+
+   public File getTmpFile() {
+      return temporaryFolder.getRoot();
+   }
+
+   protected String getJournalDir(int serverID, boolean backup) {
+      return getTmp() + "/journal_" + serverID + "_" + backup;
+   }
+
+   protected String getBindingsDir(int serverID, boolean backup) {
+      return getTmp() + "/binding_" + serverID + "_" + backup;
+   }
+
+   protected String getPageDir(int serverID, boolean backup) {
+      return getTmp() + "/paging_" + serverID + "_" + backup;
+   }
+
+   protected String getLargeMessagesDir(int serverID, boolean backup) {
+      return getTmp() + "/paging_" + serverID + "_" + backup;
+   }
+
+   protected static String newURI(String localhostAddress, int serverID) {
+      return "tcp://" + localhostAddress + ":" + (61616 + serverID);
    }
 
    /**
@@ -114,20 +160,44 @@ public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport {
       return new ActiveMQConnectionFactory(bindAddress);
    }
 
-   /**
-    * Factory method to create a new broker
-    *
-    * @throws Exception
-    */
+
+   public EmbeddedJMS createArtemisBroker() throws Exception {
+      Configuration config0 = createConfig("localhost", 0);
+      EmbeddedJMS newbroker = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+      return newbroker;
+   }
+
+   protected Configuration createConfig(final String hostAddress, final int serverID) throws Exception {
+      ConfigurationImpl configuration = new ConfigurationImpl().setJMXManagementEnabled(false).
+              setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(1000 * 1024).setJournalType(JournalType.NIO).
+              setJournalDirectory(getJournalDir(serverID, false)).
+              setBindingsDirectory(getBindingsDir(serverID, false)).
+              setPagingDirectory(getPageDir(serverID, false)).
+              setLargeMessagesDirectory(getLargeMessagesDir(serverID, false)).
+              setJournalCompactMinFiles(0).
+              setJournalCompactPercentage(0).
+              setClusterPassword(CLUSTER_PASSWORD);
+
+      configuration.addAddressesSetting("#", new AddressSettings().setAutoCreateJmsQueues(true).setAutoDeleteJmsQueues(true));
+
+      configuration.addAcceptorConfiguration("netty", newURI(hostAddress, serverID));
+      configuration.addConnectorConfiguration("netty-connector", newURI(hostAddress, serverID));
+
+      return configuration;
+   }
+
+   //we keep this because some other tests uses it.
+   //we'll delete this when those tests are dealt with.
    protected BrokerService createBroker() throws Exception {
       BrokerService answer = new BrokerService();
       answer.setPersistent(isPersistent());
+      answer.getManagementContext().setCreateConnector(false);
       answer.addConnector(bindAddress);
       return answer;
    }
 
    protected void startBroker() throws Exception {
-      broker.start();
+      artemisBroker.start();
    }
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/14d4d0c8/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java
new file mode 100755
index 0000000..b7c2e94
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.ArrayList;
+import java.util.Enumeration;
+
+import org.apache.activemq.test.JmsResourceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class JmsQueueTransactionTest extends JmsTransactionTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(JmsQueueTransactionTest.class);
+
+    /**
+     * @see org.apache.activemq.JmsTransactionTestSupport#getJmsResourceProvider()
+     */
+    protected JmsResourceProvider getJmsResourceProvider() {
+        JmsResourceProvider p = new JmsResourceProvider();
+        p.setTopic(false);
+        return p;
+    }
+
+    /**
+     * Tests if the the connection gets reset, the messages will still be
+     * received.
+     *
+     * @throws Exception
+     */
+    public void testReceiveTwoThenCloseConnection() throws Exception {
+        Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
+
+        // lets consume any outstanding messages from previous test runs
+        beginTx();
+        while (consumer.receive(1000) != null) {
+        }
+        commitTx();
+
+        beginTx();
+        producer.send(outbound[0]);
+        producer.send(outbound[1]);
+        commitTx();
+
+        LOG.info("Sent 0: " + outbound[0]);
+        LOG.info("Sent 1: " + outbound[1]);
+
+        ArrayList<Message> messages = new ArrayList<Message>();
+        beginTx();
+        Message message = consumer.receive(2000);
+        assertEquals(outbound[0], message);
+
+        message = consumer.receive(2000);
+        assertNotNull(message);
+        assertEquals(outbound[1], message);
+
+        // Close and reopen connection.
+        reconnect();
+
+        // Consume again.. the previous message should
+        // get redelivered.
+        beginTx();
+        message = consumer.receive(2000);
+        assertNotNull("Should have re-received the first message again!", message);
+        messages.add(message);
+        assertEquals(outbound[0], message);
+
+        message = consumer.receive(5000);
+        assertNotNull("Should have re-received the second message again!", message);
+        messages.add(message);
+        assertEquals(outbound[1], message);
+        commitTx();
+
+        Message inbound[] = new Message[messages.size()];
+        messages.toArray(inbound);
+
+        assertTextMessagesEqual("Rollback did not work", outbound, inbound);
+    }
+
+    /**
+     * Tests sending and receiving messages with two sessions(one for producing
+     * and another for consuming).
+     *
+     * @throws Exception
+     */
+    public void testSendReceiveInSeperateSessionTest() throws Exception {
+        session.close();
+        int batchCount = 10;
+
+        for (int i = 0; i < batchCount; i++) {
+            // Session that sends messages
+            {
+                Session session = resourceProvider.createSession(connection);
+                this.session = session;
+                MessageProducer producer = resourceProvider.createProducer(session, destination);
+                // consumer = resourceProvider.createConsumer(session,
+                // destination);
+                beginTx();
+                producer.send(session.createTextMessage("Test Message: " + i));
+                commitTx();
+                session.close();
+            }
+
+            // Session that consumes messages
+            {
+                Session session = resourceProvider.createSession(connection);
+                this.session = session;
+                MessageConsumer consumer = resourceProvider.createConsumer(session, destination);
+
+                beginTx();
+                TextMessage message = (TextMessage)consumer.receive(1000 * 5);
+                assertNotNull("Received only " + i + " messages in batch ", message);
+                assertEquals("Test Message: " + i, message.getText());
+
+                commitTx();
+                session.close();
+            }
+        }
+    }
+
+    /**
+     * Tests the queue browser. Browses the messages then the consumer tries to
+     * receive them. The messages should still be in the queue even when it was
+     * browsed.
+     *
+     * @throws Exception
+     */
+    public void testReceiveBrowseReceive() throws Exception {
+        Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message"), session.createTextMessage("Third Message")};
+
+        // lets consume any outstanding messages from previous test runs
+        beginTx();
+        while (consumer.receive(1000) != null) {
+        }
+        commitTx();
+
+        beginTx();
+        producer.send(outbound[0]);
+        producer.send(outbound[1]);
+        producer.send(outbound[2]);
+        commitTx();
+
+        // Get the first.
+        beginTx();
+        assertEquals(outbound[0], consumer.receive(1000));
+        consumer.close();
+        commitTx();
+
+        beginTx();
+        QueueBrowser browser = session.createBrowser((Queue)destination);
+        Enumeration enumeration = browser.getEnumeration();
+
+        // browse the second
+        assertTrue("should have received the second message", enumeration.hasMoreElements());
+        assertEquals(outbound[1], (Message)enumeration.nextElement());
+
+        // browse the third.
+        assertTrue("Should have received the third message", enumeration.hasMoreElements());
+        assertEquals(outbound[2], (Message)enumeration.nextElement());
+
+        LOG.info("Check for more...");
+        // There should be no more.
+        boolean tooMany = false;
+        while (enumeration.hasMoreElements()) {
+            LOG.info("Got extra message: " + ((TextMessage)enumeration.nextElement()).getText());
+            tooMany = true;
+        }
+        assertFalse(tooMany);
+        LOG.info("close browser...");
+        browser.close();
+
+        LOG.info("reopen and consume...");
+        // Re-open the consumer.
+        consumer = resourceProvider.createConsumer(session, destination);
+        // Receive the second.
+        assertEquals(outbound[1], consumer.receive(1000));
+        // Receive the third.
+        assertEquals(outbound[2], consumer.receive(1000));
+        consumer.close();
+
+        commitTx();
+    }
+
+    public void testCloseConsumer() throws Exception {
+        Destination dest = session.createQueue(getSubject() + "?consumer.prefetchSize=0");
+        producer = session.createProducer(dest);
+        beginTx();
+        producer.send(session.createTextMessage("message 1"));
+        producer.send(session.createTextMessage("message 2"));
+        commitTx();
+
+        beginTx();
+        consumer = session.createConsumer(dest);
+        Message message1 = consumer.receive(1000);
+        String text1 = ((TextMessage)message1).getText();
+        assertNotNull(message1);
+        assertEquals("message 1", text1);
+
+        consumer.close();
+
+        consumer = session.createConsumer(dest);
+
+        Message message2 = consumer.receive(1000);
+        String text2 = ((TextMessage)message2).getText();
+        assertNotNull(message2);
+        assertEquals("message 2", text2);
+        commitTx();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/14d4d0c8/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java
new file mode 100755
index 0000000..dfcf302
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java
@@ -0,0 +1,721 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.test.JmsResourceProvider;
+import org.apache.activemq.test.TestSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public abstract class JmsTransactionTestSupport extends TestSupport implements MessageListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JmsTransactionTestSupport.class);
+    private static final int MESSAGE_COUNT = 5;
+    private static final String MESSAGE_TEXT = "message";
+
+    protected ConnectionFactory connectionFactory;
+    protected Connection connection;
+    protected Session session;
+    protected MessageConsumer consumer;
+    protected MessageProducer producer;
+    protected JmsResourceProvider resourceProvider;
+    protected Destination destination;
+    protected int batchCount = 10;
+    protected int batchSize = 20;
+    protected BrokerService broker;
+
+    // for message listener test
+    private final List<Message> unackMessages = new ArrayList<Message>(MESSAGE_COUNT);
+    private final List<Message> ackMessages = new ArrayList<Message>(MESSAGE_COUNT);
+    private boolean resendPhase;
+
+    public JmsTransactionTestSupport() {
+        super();
+    }
+
+    public JmsTransactionTestSupport(String name) {
+        super(name);
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see junit.framework.TestCase#setUp()
+     */
+    @Override
+    protected void setUp() throws Exception {
+        broker = createBroker();
+        broker.start();
+        broker.waitUntilStarted();
+
+        resourceProvider = getJmsResourceProvider();
+        topic = resourceProvider.isTopic();
+        // We will be using transacted sessions.
+        setSessionTransacted();
+        connectionFactory = newConnectionFactory();
+        reconnect();
+    }
+
+    protected void setSessionTransacted() {
+        resourceProvider.setTransacted(true);
+    }
+
+    protected ConnectionFactory newConnectionFactory() throws Exception {
+        return resourceProvider.createConnectionFactory();
+    }
+
+    protected void beginTx() throws Exception {
+        //no-op for local tx
+    }
+
+    protected void commitTx() throws Exception {
+        session.commit();
+    }
+
+    protected void rollbackTx() throws Exception {
+        session.rollback();
+    }
+
+    /**
+     */
+    protected BrokerService createBroker() throws Exception, URISyntaxException {
+        return BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false"));
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see junit.framework.TestCase#tearDown()
+     */
+    @Override
+    protected void tearDown() throws Exception {
+        LOG.info("Closing down connection");
+
+        try {
+            session.close();
+            session = null;
+            connection.close();
+            connection = null;
+        } catch (Exception e) {
+            LOG.info("Caught exception while closing resources.");
+        }
+
+        try {
+            broker.stop();
+            broker.waitUntilStopped();
+            broker = null;
+        } catch (Exception e) {
+            LOG.info("Caught exception while shutting down the Broker", e);
+        }
+
+        LOG.info("Connection closed.");
+    }
+
+    protected abstract JmsResourceProvider getJmsResourceProvider();
+
+    /**
+     * Sends a batch of messages and validates that the messages are received.
+     *
+     * @throws Exception
+     */
+    public void testSendReceiveTransactedBatches() throws Exception {
+
+        TextMessage message = session.createTextMessage("Batch Message");
+        for (int j = 0; j < batchCount; j++) {
+            LOG.info("Producing bacth " + j + " of " + batchSize + " messages");
+
+            beginTx();
+            for (int i = 0; i < batchSize; i++) {
+                producer.send(message);
+            }
+            messageSent();
+            commitTx();
+            LOG.info("Consuming bacth " + j + " of " + batchSize + " messages");
+
+            beginTx();
+            for (int i = 0; i < batchSize; i++) {
+                message = (TextMessage)consumer.receive(1000 * 5);
+                assertNotNull("Received only " + i + " messages in batch " + j, message);
+                assertEquals("Batch Message", message.getText());
+            }
+
+            commitTx();
+        }
+    }
+
+    protected void messageSent() throws Exception {
+    }
+
+    /**
+     * Sends a batch of messages and validates that the rollbacked message was
+     * not consumed.
+     *
+     * @throws Exception
+     */
+    public void testSendRollback() throws Exception {
+        Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
+
+        // sends a message
+        beginTx();
+        producer.send(outbound[0]);
+        commitTx();
+
+        // sends a message that gets rollbacked
+        beginTx();
+        producer.send(session.createTextMessage("I'm going to get rolled back."));
+        rollbackTx();
+
+        // sends a message
+        beginTx();
+        producer.send(outbound[1]);
+        commitTx();
+
+        // receives the first message
+        beginTx();
+        ArrayList<Message> messages = new ArrayList<Message>();
+        LOG.info("About to consume message 1");
+        Message message = consumer.receive(1000);
+        messages.add(message);
+        LOG.info("Received: " + message);
+
+        // receives the second message
+        LOG.info("About to consume message 2");
+        message = consumer.receive(4000);
+        messages.add(message);
+        LOG.info("Received: " + message);
+
+        // validates that the rollbacked was not consumed
+        commitTx();
+        Message inbound[] = new Message[messages.size()];
+        messages.toArray(inbound);
+        assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
+    }
+
+    /**
+     * spec section 3.6 acking a message with automation acks has no effect.
+     * @throws Exception
+     */
+    public void testAckMessageInTx() throws Exception {
+        Message[] outbound = new Message[] {session.createTextMessage("First Message")};
+
+        // sends a message
+        beginTx();
+        producer.send(outbound[0]);
+        outbound[0].acknowledge();
+        commitTx();
+        outbound[0].acknowledge();
+
+        // receives the first message
+        beginTx();
+        ArrayList<Message> messages = new ArrayList<Message>();
+        LOG.info("About to consume message 1");
+        Message message = consumer.receive(1000);
+        messages.add(message);
+        LOG.info("Received: " + message);
+
+        // validates that the rollbacked was not consumed
+        commitTx();
+        Message inbound[] = new Message[messages.size()];
+        messages.toArray(inbound);
+        assertTextMessagesEqual("Message not delivered.", outbound, inbound);
+    }
+
+    /**
+     * Sends a batch of messages and validates that the message sent before
+     * session close is not consumed.
+     *
+     * This test only works with local transactions, not xa.
+     * @throws Exception
+     */
+    public void testSendSessionClose() throws Exception {
+        Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
+
+        // sends a message
+        beginTx();
+        producer.send(outbound[0]);
+        commitTx();
+
+        // sends a message that gets rollbacked
+        beginTx();
+        producer.send(session.createTextMessage("I'm going to get rolled back."));
+        consumer.close();
+
+        reconnectSession();
+
+        // sends a message
+        producer.send(outbound[1]);
+        commitTx();
+
+        // receives the first message
+        ArrayList<Message> messages = new ArrayList<Message>();
+        LOG.info("About to consume message 1");
+        beginTx();
+        Message message = consumer.receive(1000);
+        messages.add(message);
+        LOG.info("Received: " + message);
+
+        // receives the second message
+        LOG.info("About to consume message 2");
+        message = consumer.receive(4000);
+        messages.add(message);
+        LOG.info("Received: " + message);
+
+        // validates that the rollbacked was not consumed
+        commitTx();
+        Message inbound[] = new Message[messages.size()];
+        messages.toArray(inbound);
+        assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
+    }
+
+    /**
+     * Sends a batch of messages and validates that the message sent before
+     * session close is not consumed.
+     *
+     * @throws Exception
+     */
+    public void testSendSessionAndConnectionClose() throws Exception {
+        Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
+
+        // sends a message
+        beginTx();
+        producer.send(outbound[0]);
+        commitTx();
+
+        // sends a message that gets rollbacked
+        beginTx();
+        producer.send(session.createTextMessage("I'm going to get rolled back."));
+        consumer.close();
+        session.close();
+
+        reconnect();
+
+        // sends a message
+        beginTx();
+        producer.send(outbound[1]);
+        commitTx();
+
+        // receives the first message
+        ArrayList<Message> messages = new ArrayList<Message>();
+        LOG.info("About to consume message 1");
+        beginTx();
+        Message message = consumer.receive(1000);
+        messages.add(message);
+        LOG.info("Received: " + message);
+
+        // receives the second message
+        LOG.info("About to consume message 2");
+        message = consumer.receive(4000);
+        messages.add(message);
+        LOG.info("Received: " + message);
+
+        // validates that the rollbacked was not consumed
+        commitTx();
+        Message inbound[] = new Message[messages.size()];
+        messages.toArray(inbound);
+        assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
+    }
+
+    /**
+     * Sends a batch of messages and validates that the rollbacked message was
+     * redelivered.
+     *
+     * @throws Exception
+     */
+    public void testReceiveRollback() throws Exception {
+        Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
+
+        // lets consume any outstanding messages from prev test runs
+        beginTx();
+            while (consumer.receive(1000) != null) {
+        }
+        commitTx();
+
+        // sent both messages
+        beginTx();
+        producer.send(outbound[0]);
+        producer.send(outbound[1]);
+        commitTx();
+
+        LOG.info("Sent 0: " + outbound[0]);
+        LOG.info("Sent 1: " + outbound[1]);
+
+        ArrayList<Message> messages = new ArrayList<Message>();
+        beginTx();
+        Message message = consumer.receive(1000);
+        messages.add(message);
+        assertEquals(outbound[0], message);
+        commitTx();
+
+        // rollback so we can get that last message again.
+        beginTx();
+        message = consumer.receive(1000);
+        assertNotNull(message);
+        assertEquals(outbound[1], message);
+        rollbackTx();
+
+        // Consume again.. the prev message should
+        // get redelivered.
+        beginTx();
+        message = consumer.receive(5000);
+        assertNotNull("Should have re-received the message again!", message);
+        messages.add(message);
+        commitTx();
+
+        Message inbound[] = new Message[messages.size()];
+        messages.toArray(inbound);
+        assertTextMessagesEqual("Rollback did not work", outbound, inbound);
+    }
+
+    /**
+     * Sends a batch of messages and validates that the rollbacked message was
+     * redelivered.
+     *
+     * @throws Exception
+     */
+    public void testReceiveTwoThenRollback() throws Exception {
+        Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
+
+        // lets consume any outstanding messages from prev test runs
+        beginTx();
+        while (consumer.receive(1000) != null) {
+        }
+        commitTx();
+
+        //
+        beginTx();
+        producer.send(outbound[0]);
+        producer.send(outbound[1]);
+        commitTx();
+
+        LOG.info("Sent 0: " + outbound[0]);
+        LOG.info("Sent 1: " + outbound[1]);
+
+        ArrayList<Message> messages = new ArrayList<Message>();
+        beginTx();
+        Message message = consumer.receive(1000);
+        assertEquals(outbound[0], message);
+
+        message = consumer.receive(1000);
+        assertNotNull(message);
+        assertEquals(outbound[1], message);
+        rollbackTx();
+
+        // Consume again.. the prev message should
+        // get redelivered.
+        beginTx();
+        message = consumer.receive(5000);
+        assertNotNull("Should have re-received the first message again!", message);
+        messages.add(message);
+        assertEquals(outbound[0], message);
+        message = consumer.receive(5000);
+        assertNotNull("Should have re-received the second message again!", message);
+        messages.add(message);
+        assertEquals(outbound[1], message);
+
+        assertNull(consumer.receiveNoWait());
+        commitTx();
+
+        Message inbound[] = new Message[messages.size()];
+        messages.toArray(inbound);
+        assertTextMessagesEqual("Rollback did not work", outbound, inbound);
+    }
+
+    /**
+     * Sends a batch of messages and validates that the rollbacked message was
+     * not consumed.
+     *
+     * @throws Exception
+     */
+    public void testSendReceiveWithPrefetchOne() throws Exception {
+        setPrefetchToOne();
+        Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message"), session.createTextMessage("Third Message"),
+                                            session.createTextMessage("Fourth Message")};
+
+        beginTx();
+        for (int i = 0; i < outbound.length; i++) {
+            // sends a message
+            producer.send(outbound[i]);
+        }
+        commitTx();
+
+        // receives the first message
+        beginTx();
+        for (int i = 0; i < outbound.length; i++) {
+            LOG.info("About to consume message 1");
+            Message message = consumer.receive(1000);
+            assertNotNull(message);
+            LOG.info("Received: " + message);
+        }
+
+        // validates that the rollbacked was not consumed
+        commitTx();
+    }
+
+    /**
+     * Perform the test that validates if the rollbacked message was redelivered
+     * multiple times.
+     *
+     * @throws Exception
+     */
+    public void testReceiveTwoThenRollbackManyTimes() throws Exception {
+        for (int i = 0; i < 5; i++) {
+            testReceiveTwoThenRollback();
+        }
+    }
+
+    /**
+     * Sends a batch of messages and validates that the rollbacked message was
+     * not consumed. This test differs by setting the message prefetch to one.
+     *
+     * @throws Exception
+     */
+    public void testSendRollbackWithPrefetchOfOne() throws Exception {
+        setPrefetchToOne();
+        testSendRollback();
+    }
+
+    /**
+     * Sends a batch of messages and and validates that the rollbacked message
+     * was redelivered. This test differs by setting the message prefetch to
+     * one.
+     *
+     * @throws Exception
+     */
+    public void testReceiveRollbackWithPrefetchOfOne() throws Exception {
+        setPrefetchToOne();
+        testReceiveRollback();
+    }
+
+    /**
+     * Tests if the messages can still be received if the consumer is closed
+     * (session is not closed).
+     *
+     * @throws Exception see http://jira.codehaus.org/browse/AMQ-143
+     */
+    public void testCloseConsumerBeforeCommit() throws Exception {
+        TextMessage[] outbound = new TextMessage[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
+
+        // lets consume any outstanding messages from prev test runs
+        beginTx();
+        while (consumer.receiveNoWait() != null) {
+        }
+
+        commitTx();
+
+        // sends the messages
+        beginTx();
+        producer.send(outbound[0]);
+        producer.send(outbound[1]);
+        commitTx();
+        LOG.info("Sent 0: " + outbound[0]);
+        LOG.info("Sent 1: " + outbound[1]);
+
+        beginTx();
+        TextMessage message = (TextMessage)consumer.receive(1000);
+        assertEquals(outbound[0].getText(), message.getText());
+        // Close the consumer before the commit. This should not cause the
+        // received message
+        // to rollback.
+        consumer.close();
+        commitTx();
+
+        // Create a new consumer
+        consumer = resourceProvider.createConsumer(session, destination);
+        LOG.info("Created consumer: " + consumer);
+
+        beginTx();
+        message = (TextMessage)consumer.receive(1000);
+        assertEquals(outbound[1].getText(), message.getText());
+        commitTx();
+    }
+
+    public void testChangeMutableObjectInObjectMessageThenRollback() throws Exception {
+        ArrayList<String> list = new ArrayList<String>();
+        list.add("First");
+        Message outbound = session.createObjectMessage(list);
+        outbound.setStringProperty("foo", "abc");
+
+        beginTx();
+        producer.send(outbound);
+        commitTx();
+
+        LOG.info("About to consume message 1");
+        beginTx();
+        Message message = consumer.receive(5000);
+
+        List<String> body = assertReceivedObjectMessageWithListBody(message);
+
+        // now lets try mutate it
+        try {
+            message.setStringProperty("foo", "def");
+            fail("Cannot change properties of the object!");
+        } catch (JMSException e) {
+            LOG.info("Caught expected exception: " + e, e);
+        }
+        body.clear();
+        body.add("This should never be seen!");
+        rollbackTx();
+
+        beginTx();
+        message = consumer.receive(5000);
+        List<String> secondBody = assertReceivedObjectMessageWithListBody(message);
+        assertNotSame("Second call should return a different body", secondBody, body);
+        commitTx();
+    }
+
+    @SuppressWarnings("unchecked")
+    protected List<String> assertReceivedObjectMessageWithListBody(Message message) throws JMSException {
+        assertNotNull("Should have received a message!", message);
+        assertEquals("foo header", "abc", message.getStringProperty("foo"));
+
+        assertTrue("Should be an object message but was: " + message, message instanceof ObjectMessage);
+        ObjectMessage objectMessage = (ObjectMessage)message;
+        List<String> body = (List<String>)objectMessage.getObject();
+        LOG.info("Received body: " + body);
+
+        assertEquals("Size of list should be 1", 1, body.size());
+        assertEquals("element 0 of list", "First", body.get(0));
+        return body;
+    }
+
+    /**
+     * Recreates the connection.
+     *
+     * @throws javax.jms.JMSException
+     */
+    protected void reconnect() throws Exception {
+
+        if (connection != null) {
+            // Close the prev connection.
+            connection.close();
+        }
+        session = null;
+        connection = resourceProvider.createConnection(connectionFactory);
+        reconnectSession();
+        connection.start();
+    }
+
+    /**
+     * Recreates the connection.
+     *
+     * @throws javax.jms.JMSException
+     */
+    protected void reconnectSession() throws JMSException {
+        if (session != null) {
+            session.close();
+        }
+
+        session = resourceProvider.createSession(connection);
+        destination = resourceProvider.createDestination(session, getSubject());
+        producer = resourceProvider.createProducer(session, destination);
+        consumer = resourceProvider.createConsumer(session, destination);
+    }
+
+    /**
+     * Sets the prefeftch policy to one.
+     */
+    protected void setPrefetchToOne() {
+        ActiveMQPrefetchPolicy prefetchPolicy = getPrefetchPolicy();
+        prefetchPolicy.setQueuePrefetch(1);
+        prefetchPolicy.setTopicPrefetch(1);
+        prefetchPolicy.setDurableTopicPrefetch(1);
+        prefetchPolicy.setOptimizeDurableTopicPrefetch(1);
+    }
+
+    protected ActiveMQPrefetchPolicy getPrefetchPolicy() {
+        return ((ActiveMQConnection)connection).getPrefetchPolicy();
+    }
+
+    //This test won't work with xa tx so no beginTx() has been added.
+    public void testMessageListener() throws Exception {
+        // send messages
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            producer.send(session.createTextMessage(MESSAGE_TEXT + i));
+        }
+        commitTx();
+        consumer.setMessageListener(this);
+        // wait receive
+        waitReceiveUnack();
+        assertEquals(unackMessages.size(), MESSAGE_COUNT);
+        // resend phase
+        waitReceiveAck();
+        assertEquals(ackMessages.size(), MESSAGE_COUNT);
+        // should no longer re-receive
+        consumer.setMessageListener(null);
+        assertNull(consumer.receive(500));
+        reconnect();
+    }
+
+    @Override
+    public void onMessage(Message message) {
+        if (!resendPhase) {
+            unackMessages.add(message);
+            if (unackMessages.size() == MESSAGE_COUNT) {
+                try {
+                    rollbackTx();
+                    resendPhase = true;
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        } else {
+            ackMessages.add(message);
+            if (ackMessages.size() == MESSAGE_COUNT) {
+                try {
+                    commitTx();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    private void waitReceiveUnack() throws Exception {
+        for (int i = 0; i < 100 && !resendPhase; i++) {
+            Thread.sleep(100);
+        }
+        assertTrue(resendPhase);
+    }
+
+    private void waitReceiveAck() throws Exception {
+        for (int i = 0; i < 100 && ackMessages.size() < MESSAGE_COUNT; i++) {
+            Thread.sleep(100);
+        }
+        assertFalse(ackMessages.size() < MESSAGE_COUNT);
+    }
+}