You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/16 02:53:43 UTC

[19/60] [abbrv] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4083Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4083Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4083Test.java
deleted file mode 100644
index 882105b..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4083Test.java
+++ /dev/null
@@ -1,520 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.management.ObjectName;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQPrefetchPolicy;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.QueueViewMBean;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class AMQ4083Test {
-
-   private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3992Test.class);
-   private static BrokerService brokerService;
-   private static String BROKER_ADDRESS = "tcp://localhost:0";
-   private static String TEST_QUEUE = "testQueue";
-   private static ActiveMQQueue queue = new ActiveMQQueue(TEST_QUEUE);
-
-   private final int messageCount = 100;
-
-   private String connectionUri;
-   private String[] data;
-
-   @Before
-   public void setUp() throws Exception {
-      brokerService = new BrokerService();
-      brokerService.setPersistent(false);
-      brokerService.setUseJmx(true);
-      brokerService.setDeleteAllMessagesOnStartup(true);
-      connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString();
-      brokerService.start();
-      brokerService.waitUntilStarted();
-
-      data = new String[messageCount];
-
-      for (int i = 0; i < messageCount; i++) {
-         data[i] = "Text for message: " + i + " at " + new Date();
-      }
-   }
-
-   @After
-   public void tearDown() throws Exception {
-      brokerService.stop();
-      brokerService.waitUntilStopped();
-   }
-
-   @Test
-   public void testExpiredMsgsBeforeNonExpired() throws Exception {
-
-      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
-      ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
-      connection.getPrefetchPolicy().setQueuePrefetch(400);
-
-      Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
-      connection.start();
-
-      MessageProducer producer = session.createProducer(queue);
-      MessageConsumer consumer = session.createConsumer(queue);
-
-      // send a batch that expires in a short time.
-      for (int i = 0; i < 100; i++) {
-         producer.send(session.createTextMessage(), DeliveryMode.PERSISTENT, 4, 4000);
-      }
-
-      // and send one that doesn't expire to we can ack it.
-      producer.send(session.createTextMessage());
-
-      // wait long enough so the first batch times out.
-      TimeUnit.SECONDS.sleep(5);
-
-      final QueueViewMBean queueView = getProxyToQueueViewMBean();
-
-      assertEquals(101, queueView.getInFlightCount());
-
-      consumer.setMessageListener(new MessageListener() {
-         @Override
-         public void onMessage(Message message) {
-            try {
-               message.acknowledge();
-            }
-            catch (JMSException e) {
-            }
-         }
-      });
-
-      TimeUnit.SECONDS.sleep(5);
-
-      assertEquals(0, queueView.getInFlightCount());
-
-      for (int i = 0; i < 200; i++) {
-         producer.send(session.createTextMessage());
-      }
-
-      assertTrue("Inflight count should reach zero, currently: " + queueView.getInFlightCount(), Wait.waitFor(new Wait.Condition() {
-
-         @Override
-         public boolean isSatisified() throws Exception {
-            return queueView.getInFlightCount() == 0;
-         }
-      }));
-
-      LOG.info("Dequeued Count: {}", queueView.getDequeueCount());
-      LOG.info("Dispatch Count: {}", queueView.getDispatchCount());
-      LOG.info("Enqueue Count: {}", queueView.getEnqueueCount());
-      LOG.info("Expired Count: {}", queueView.getExpiredCount());
-      LOG.info("InFlight Count: {}", queueView.getInFlightCount());
-   }
-
-   @Test
-   public void testExpiredMsgsBeforeNonExpiredWithTX() throws Exception {
-
-      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
-      ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
-      connection.getPrefetchPolicy().setQueuePrefetch(400);
-
-      final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-
-      connection.start();
-
-      MessageProducer producer = session.createProducer(queue);
-      MessageConsumer consumer = session.createConsumer(queue);
-
-      // send a batch that expires in a short time.
-      for (int i = 0; i < 100; i++) {
-         producer.send(session.createTextMessage(), DeliveryMode.PERSISTENT, 4, 4000);
-      }
-
-      // and send one that doesn't expire to we can ack it.
-      producer.send(session.createTextMessage());
-      session.commit();
-
-      // wait long enough so the first batch times out.
-      TimeUnit.SECONDS.sleep(5);
-
-      final QueueViewMBean queueView = getProxyToQueueViewMBean();
-
-      assertEquals(101, queueView.getInFlightCount());
-
-      consumer.setMessageListener(new MessageListener() {
-         @Override
-         public void onMessage(Message message) {
-            try {
-               session.commit();
-            }
-            catch (JMSException e) {
-            }
-         }
-      });
-
-      TimeUnit.SECONDS.sleep(5);
-
-      assertEquals(0, queueView.getInFlightCount());
-
-      for (int i = 0; i < 200; i++) {
-         producer.send(session.createTextMessage());
-      }
-      session.commit();
-
-      assertTrue("Inflight count should reach zero, currently: " + queueView.getInFlightCount(), Wait.waitFor(new Wait.Condition() {
-
-         @Override
-         public boolean isSatisified() throws Exception {
-            return queueView.getInFlightCount() == 0;
-         }
-      }));
-
-      LOG.info("Dequeued Count: {}", queueView.getDequeueCount());
-      LOG.info("Dispatch Count: {}", queueView.getDispatchCount());
-      LOG.info("Enqueue Count: {}", queueView.getEnqueueCount());
-      LOG.info("Expired Count: {}", queueView.getExpiredCount());
-      LOG.info("InFlight Count: {}", queueView.getInFlightCount());
-   }
-
-   @Test
-   public void testExpiredMsgsInterleavedWithNonExpired() throws Exception {
-
-      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
-      ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
-      connection.getPrefetchPolicy().setQueuePrefetch(400);
-
-      Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
-      connection.start();
-
-      MessageProducer producer = session.createProducer(queue);
-      MessageConsumer consumer = session.createConsumer(queue);
-
-      // send a batch that expires in a short time.
-      for (int i = 0; i < 200; i++) {
-
-         if ((i % 2) == 0) {
-            producer.send(session.createTextMessage(), DeliveryMode.PERSISTENT, 4, 4000);
-         }
-         else {
-            producer.send(session.createTextMessage());
-         }
-      }
-
-      // wait long enough so the first batch times out.
-      TimeUnit.SECONDS.sleep(5);
-
-      final QueueViewMBean queueView = getProxyToQueueViewMBean();
-
-      assertEquals(200, queueView.getInFlightCount());
-
-      consumer.setMessageListener(new MessageListener() {
-
-         @Override
-         public void onMessage(Message message) {
-            try {
-               LOG.debug("Acking message: {}", message);
-               message.acknowledge();
-            }
-            catch (JMSException e) {
-            }
-         }
-      });
-
-      TimeUnit.SECONDS.sleep(5);
-
-      assertTrue("Inflight count should reach zero, currently: " + queueView.getInFlightCount(), Wait.waitFor(new Wait.Condition() {
-
-         @Override
-         public boolean isSatisified() throws Exception {
-            return queueView.getInFlightCount() == 0;
-         }
-      }));
-
-      for (int i = 0; i < 200; i++) {
-         producer.send(session.createTextMessage());
-      }
-
-      assertTrue("Inflight count should reach zero, currently: " + queueView.getInFlightCount(), Wait.waitFor(new Wait.Condition() {
-
-         @Override
-         public boolean isSatisified() throws Exception {
-            return queueView.getInFlightCount() == 0;
-         }
-      }));
-
-      LOG.info("Dequeued Count: {}", queueView.getDequeueCount());
-      LOG.info("Dispatch Count: {}", queueView.getDispatchCount());
-      LOG.info("Enqueue Count: {}", queueView.getEnqueueCount());
-      LOG.info("Expired Count: {}", queueView.getExpiredCount());
-      LOG.info("InFlight Count: {}", queueView.getInFlightCount());
-   }
-
-   @Test
-   public void testExpiredMsgsInterleavedWithNonExpiredCumulativeAck() throws Exception {
-
-      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
-      ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
-      connection.getPrefetchPolicy().setQueuePrefetch(400);
-
-      Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
-      connection.start();
-
-      MessageProducer producer = session.createProducer(queue);
-      MessageConsumer consumer = session.createConsumer(queue);
-
-      // send a batch that expires in a short time.
-      for (int i = 0; i < 200; i++) {
-
-         if ((i % 2) == 0) {
-            producer.send(session.createTextMessage(), DeliveryMode.PERSISTENT, 4, 4000);
-         }
-         else {
-            producer.send(session.createTextMessage());
-         }
-      }
-
-      // wait long enough so the first batch times out.
-      TimeUnit.SECONDS.sleep(5);
-
-      final QueueViewMBean queueView = getProxyToQueueViewMBean();
-
-      assertEquals(200, queueView.getInFlightCount());
-
-      final AtomicInteger msgCount = new AtomicInteger();
-
-      consumer.setMessageListener(new MessageListener() {
-
-         @Override
-         public void onMessage(Message message) {
-            try {
-               if (msgCount.incrementAndGet() == 100) {
-                  LOG.debug("Acking message: {}", message);
-                  message.acknowledge();
-               }
-            }
-            catch (JMSException e) {
-            }
-         }
-      });
-
-      TimeUnit.SECONDS.sleep(5);
-
-      assertTrue("Inflight count should reach zero, currently: " + queueView.getInFlightCount(), Wait.waitFor(new Wait.Condition() {
-
-         @Override
-         public boolean isSatisified() throws Exception {
-            return queueView.getInFlightCount() == 0;
-         }
-      }));
-
-      // Now we just ack each and see if our counters come out right in the end.
-      consumer.setMessageListener(new MessageListener() {
-
-         @Override
-         public void onMessage(Message message) {
-            try {
-               LOG.debug("Acking message: {}", message);
-               message.acknowledge();
-            }
-            catch (JMSException e) {
-            }
-         }
-      });
-
-      for (int i = 0; i < 200; i++) {
-         producer.send(session.createTextMessage());
-      }
-
-      assertTrue("Inflight count should reach zero, currently: " + queueView.getInFlightCount(), Wait.waitFor(new Wait.Condition() {
-
-         @Override
-         public boolean isSatisified() throws Exception {
-            return queueView.getInFlightCount() == 0;
-         }
-      }));
-
-      LOG.info("Dequeued Count: {}", queueView.getDequeueCount());
-      LOG.info("Dispatch Count: {}", queueView.getDispatchCount());
-      LOG.info("Enqueue Count: {}", queueView.getEnqueueCount());
-      LOG.info("Expired Count: {}", queueView.getExpiredCount());
-      LOG.info("InFlight Count: {}", queueView.getInFlightCount());
-   }
-
-   @Test
-   public void testExpiredBatchBetweenNonExpiredMessages() throws Exception {
-
-      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
-      ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
-      connection.getPrefetchPolicy().setQueuePrefetch(400);
-
-      Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
-      connection.start();
-
-      MessageProducer producer = session.createProducer(queue);
-      MessageConsumer consumer = session.createConsumer(queue);
-
-      // Send one that doesn't expire so we can ack it.
-      producer.send(session.createTextMessage());
-
-      // send a batch that expires in a short time.
-      for (int i = 0; i < 100; i++) {
-         producer.send(session.createTextMessage(), DeliveryMode.PERSISTENT, 4, 4000);
-      }
-
-      // and send one that doesn't expire so we can ack it.
-      producer.send(session.createTextMessage());
-
-      // wait long enough so the first batch times out.
-      TimeUnit.SECONDS.sleep(5);
-
-      final QueueViewMBean queueView = getProxyToQueueViewMBean();
-
-      assertEquals(102, queueView.getInFlightCount());
-
-      consumer.setMessageListener(new MessageListener() {
-
-         @Override
-         public void onMessage(Message message) {
-            try {
-               message.acknowledge();
-            }
-            catch (JMSException e) {
-            }
-         }
-      });
-
-      TimeUnit.SECONDS.sleep(5);
-
-      assertTrue("Inflight count should reach zero, currently: " + queueView.getInFlightCount(), Wait.waitFor(new Wait.Condition() {
-
-         @Override
-         public boolean isSatisified() throws Exception {
-            return queueView.getInFlightCount() == 0;
-         }
-      }));
-
-      for (int i = 0; i < 200; i++) {
-         producer.send(session.createTextMessage());
-      }
-
-      assertTrue("Inflight count should reach zero, currently: " + queueView.getInFlightCount(), Wait.waitFor(new Wait.Condition() {
-
-         @Override
-         public boolean isSatisified() throws Exception {
-            return queueView.getInFlightCount() == 0;
-         }
-      }));
-
-      LOG.info("Dequeued Count: {}", queueView.getDequeueCount());
-      LOG.info("Dispatch Count: {}", queueView.getDispatchCount());
-      LOG.info("Enqueue Count: {}", queueView.getEnqueueCount());
-      LOG.info("Expired Count: {}", queueView.getExpiredCount());
-      LOG.info("InFlight Count: {}", queueView.getInFlightCount());
-   }
-
-   @Test
-   public void testConsumeExpiredQueueAndDlq() throws Exception {
-
-      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
-      Connection connection = factory.createConnection();
-
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-      MessageProducer producerNormal = session.createProducer(queue);
-      MessageProducer producerExpire = session.createProducer(queue);
-      producerExpire.setTimeToLive(500);
-
-      MessageConsumer dlqConsumer = session.createConsumer(session.createQueue("ActiveMQ.DLQ"));
-      connection.start();
-
-      Connection consumerConnection = factory.createConnection();
-      ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
-      prefetchPolicy.setAll(10);
-      ((ActiveMQConnection) consumerConnection).setPrefetchPolicy(prefetchPolicy);
-      Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-      MessageConsumer consumer = consumerSession.createConsumer(queue);
-      consumerConnection.start();
-
-      String msgBody = new String(new byte[20 * 1024]);
-      for (int i = 0; i < data.length; i++) {
-         Message message = session.createTextMessage(msgBody);
-         producerExpire.send(queue, message);
-      }
-
-      for (int i = 0; i < data.length; i++) {
-         Message message = session.createTextMessage(msgBody);
-         producerNormal.send(queue, message);
-      }
-
-      ArrayList<Message> messages = new ArrayList<>();
-      Message received;
-      while ((received = consumer.receive(1000)) != null) {
-         messages.add(received);
-         if (messages.size() == 1) {
-            TimeUnit.SECONDS.sleep(1);
-         }
-         received.acknowledge();
-      }
-
-      assertEquals("got messages", messageCount + 1, messages.size());
-
-      ArrayList<Message> dlqMessages = new ArrayList<>();
-      while ((received = dlqConsumer.receive(1000)) != null) {
-         dlqMessages.add(received);
-      }
-
-      assertEquals("got dlq messages", data.length - 1, dlqMessages.size());
-
-      final QueueViewMBean queueView = getProxyToQueueViewMBean();
-
-      LOG.info("Dequeued Count: {}", queueView.getDequeueCount());
-      LOG.info("Dispatch Count: {}", queueView.getDispatchCount());
-      LOG.info("Enqueue Count: {}", queueView.getEnqueueCount());
-      LOG.info("Expired Count: {}", queueView.getExpiredCount());
-      LOG.info("InFlight Count: {}", queueView.getInFlightCount());
-   }
-
-   private QueueViewMBean getProxyToQueueViewMBean() throws Exception {
-      final ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + queue.getQueueName());
-      final QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
-      return proxy;
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4092Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4092Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4092Test.java
deleted file mode 100644
index e894b70..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4092Test.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.HashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ4092Test extends TestCase {
-
-   private static final Logger log = LoggerFactory.getLogger(AMQ4092Test.class);
-
-   static final String QUEUE_NAME = "TEST";
-
-   // increase limits to expedite failure
-   static final int NUM_TO_SEND_PER_PRODUCER = 1000; // 10000
-   static final int NUM_PRODUCERS = 5; // 40
-
-   static final ActiveMQQueue[] DESTINATIONS = new ActiveMQQueue[]{new ActiveMQQueue("A"), new ActiveMQQueue("B")
-      // A/B seems to be sufficient for concurrentStoreAndDispatch=true
-   };
-
-   static final boolean debug = false;
-
-   private BrokerService brokerService;
-
-   private ActiveMQQueue destination;
-   private HashMap<Thread, Throwable> exceptions = new HashMap<>();
-   private ExceptionListener exceptionListener = new ExceptionListener() {
-      @Override
-      public void onException(JMSException exception) {
-         exception.printStackTrace();
-         exceptions.put(Thread.currentThread(), exception);
-      }
-   };
-
-   @Override
-   protected void setUp() throws Exception {
-      brokerService = new BrokerService();
-      brokerService.setDeleteAllMessagesOnStartup(true);
-      ((KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(false);
-      brokerService.addConnector("tcp://localhost:0");
-      brokerService.start();
-      destination = new ActiveMQQueue();
-      destination.setCompositeDestinations(DESTINATIONS);
-      Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
-         @Override
-         public void uncaughtException(Thread t, Throwable e) {
-            exceptions.put(t, e);
-         }
-      });
-   }
-
-   @Override
-   protected void tearDown() throws Exception {
-      // Stop any running threads.
-      brokerService.stop();
-   }
-
-   public void testConcurrentGroups() throws Exception {
-      ExecutorService executorService = Executors.newCachedThreadPool();
-      executorService.submit(new TestConsumer());
-      for (int i = 0; i < NUM_PRODUCERS; i++) {
-         executorService.submit(new TestProducer());
-      }
-      executorService.shutdown();
-      executorService.awaitTermination(5, TimeUnit.MINUTES);
-      assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
-   }
-
-   class TestProducer implements Runnable {
-
-      public void produceMessages() throws Exception {
-         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getConnectUri().toString());
-         connectionFactory.setExceptionListener(exceptionListener);
-         connectionFactory.setUseAsyncSend(true);
-         Connection connection = connectionFactory.createConnection();
-         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageProducer producer = session.createProducer(destination);
-         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-
-         String name = new String(new byte[2 * 1024]);
-         for (int i = 1; i <= NUM_TO_SEND_PER_PRODUCER; i++) {
-
-            TextMessage message = session.createTextMessage(name + "_" + i);
-            for (int j = 0; j < 100; j++) {
-               message.setStringProperty("Prop" + j, "" + j);
-            }
-            message.setStringProperty("JMSXGroupID", Thread.currentThread().getName() + i);
-            message.setIntProperty("JMSXGroupSeq", 1);
-            producer.send(message);
-         }
-
-         producer.close();
-         session.close();
-         connection.close();
-      }
-
-      @Override
-      public void run() {
-         try {
-            produceMessages();
-         }
-         catch (Exception e) {
-            e.printStackTrace();
-            exceptions.put(Thread.currentThread(), e);
-         }
-      }
-   }
-
-   class TestConsumer implements Runnable {
-
-      private CountDownLatch finishLatch = new CountDownLatch(1);
-
-      public void consume() throws Exception {
-         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getConnectUri().toString());
-
-         connectionFactory.setExceptionListener(exceptionListener);
-         final int totalMessageCount = NUM_TO_SEND_PER_PRODUCER * DESTINATIONS.length * NUM_PRODUCERS;
-         final AtomicInteger counter = new AtomicInteger();
-         final MessageListener listener = new MessageListener() {
-            @Override
-            public void onMessage(Message message) {
-
-               if (debug) {
-                  try {
-                     log.info(((TextMessage) message).getText());
-                  }
-                  catch (JMSException e) {
-                     e.printStackTrace();
-                  }
-               }
-
-               boolean first = false;
-               try {
-                  first = message.getBooleanProperty("JMSXGroupFirstForConsumer");
-               }
-               catch (JMSException e) {
-                  e.printStackTrace();
-                  exceptions.put(Thread.currentThread(), e);
-               }
-               assertTrue("Always is first message", first);
-               if (counter.incrementAndGet() == totalMessageCount) {
-                  log.info("Got all:" + counter.get());
-                  finishLatch.countDown();
-
-               }
-            }
-         };
-
-         int consumerCount = DESTINATIONS.length * 100;
-         Connection[] connections = new Connection[consumerCount];
-
-         Session[] sessions = new Session[consumerCount];
-         MessageConsumer[] consumers = new MessageConsumer[consumerCount];
-
-         for (int i = 0; i < consumerCount; i++) {
-            connections[i] = connectionFactory.createConnection();
-            connections[i].start();
-
-            sessions[i] = connections[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-            consumers[i] = sessions[i].createConsumer(DESTINATIONS[i % DESTINATIONS.length], null);
-            consumers[i].setMessageListener(listener);
-         }
-
-         log.info("received " + counter.get() + " messages");
-
-         assertTrue("got all messages in time", finishLatch.await(4, TimeUnit.MINUTES));
-
-         log.info("received " + counter.get() + " messages");
-
-         for (MessageConsumer consumer : consumers) {
-            consumer.close();
-         }
-
-         for (Session session : sessions) {
-            session.close();
-         }
-
-         for (Connection connection : connections) {
-            connection.close();
-         }
-      }
-
-      @Override
-      public void run() {
-         try {
-            consume();
-         }
-         catch (Exception e) {
-            e.printStackTrace();
-            exceptions.put(Thread.currentThread(), e);
-         }
-      }
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4116Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4116Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4116Test.java
deleted file mode 100644
index b87fd1b..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4116Test.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.bugs;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.junit.Assert;
-
-public class AMQ4116Test extends EmbeddedBrokerTestSupport {
-
-   private final String tcpAddr = "tcp://localhost:0";
-   private String connectionUri;
-
-   /**
-    * In this test, a message is produced and consumed from the test queue.
-    * Memory usage on the test queue should be reset to 0. The memory that was
-    * consumed is then sent to a second queue. Memory usage on the original
-    * test queue should remain 0, but actually increased when the second
-    * enqueue occurs.
-    */
-   public void testVMTransport() throws Exception {
-      runTest(connectionFactory);
-   }
-
-   /**
-    * This is an analog to the previous test, but occurs over TCP and passes.
-    */
-   public void testTCPTransport() throws Exception {
-      runTest(new ActiveMQConnectionFactory(connectionUri));
-   }
-
-   private void runTest(ConnectionFactory connFactory) throws Exception {
-      // Verify that test queue is empty and not using any memory.
-      Destination physicalDestination = broker.getDestination(destination);
-      Assert.assertEquals(0, physicalDestination.getMemoryUsage().getUsage());
-
-      // Enqueue a single message and verify that the test queue is using
-      // memory.
-      Connection conn = connFactory.createConnection();
-      conn.start();
-      Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
-      MessageProducer producer = session.createProducer(destination);
-
-      producer.send(new ActiveMQMessage());
-
-      // Commit, which ensures message is in queue and memory usage updated.
-      session.commit();
-      Assert.assertTrue(physicalDestination.getMemoryUsage().getUsage() > 0);
-
-      // Consume the message and verify that the test queue is no longer using
-      // any memory.
-      MessageConsumer consumer = session.createConsumer(destination);
-      Message received = consumer.receive();
-      Assert.assertNotNull(received);
-
-      // Commit, which ensures message is removed from queue and memory usage
-      // updated.
-      session.commit();
-      Assert.assertEquals(0, physicalDestination.getMemoryUsage().getUsage());
-
-      // Resend the message to a different queue and verify that the original
-      // test queue is still not using any memory.
-      ActiveMQQueue secondDestination = new ActiveMQQueue(AMQ4116Test.class + ".second");
-      MessageProducer secondPproducer = session.createProducer(secondDestination);
-
-      secondPproducer.send(received);
-
-      // Commit, which ensures message is in queue and memory usage updated.
-      // NOTE: This assertion fails due to bug.
-      session.commit();
-      Assert.assertEquals(0, physicalDestination.getMemoryUsage().getUsage());
-
-      conn.stop();
-   }
-
-   /**
-    * Create an embedded broker that has both TCP and VM connectors.
-    */
-   @Override
-   protected BrokerService createBroker() throws Exception {
-      BrokerService broker = super.createBroker();
-      connectionUri = broker.addConnector(tcpAddr).getPublishableConnectString();
-      return broker;
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4126Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4126Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4126Test.java
deleted file mode 100644
index d47c7c8..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4126Test.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.net.Socket;
-import java.net.URI;
-
-import javax.management.ObjectName;
-import javax.net.SocketFactory;
-import javax.net.ssl.SSLSocketFactory;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQSslConnectionFactory;
-import org.apache.activemq.broker.BrokerFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.transport.stomp.Stomp;
-import org.apache.activemq.transport.stomp.StompConnection;
-import org.apache.activemq.transport.stomp.StompFrame;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- *
- */
-public class AMQ4126Test {
-
-   protected BrokerService broker;
-
-   protected String java_security_auth_login_config = "java.security.auth.login.config";
-   protected String xbean = "xbean:";
-   protected String confBase = "src/test/resources/org/apache/activemq/bugs/amq4126";
-   protected String certBase = "src/test/resources/org/apache/activemq/security";
-   protected String JaasStompSSLBroker_xml = "JaasStompSSLBroker.xml";
-   protected StompConnection stompConnection = new StompConnection();
-   private final static String destinationName = "TEST.QUEUE";
-   protected String oldLoginConf = null;
-
-   @Before
-   public void before() throws Exception {
-      if (System.getProperty(java_security_auth_login_config) != null) {
-         oldLoginConf = System.getProperty(java_security_auth_login_config);
-      }
-      System.setProperty(java_security_auth_login_config, confBase + "/login.config");
-      broker = BrokerFactory.createBroker(xbean + confBase + "/" + JaasStompSSLBroker_xml);
-
-      broker.setDeleteAllMessagesOnStartup(true);
-      broker.setUseJmx(true);
-      broker.start();
-      broker.waitUntilStarted();
-   }
-
-   @After
-   public void after() throws Exception {
-      broker.stop();
-
-      if (oldLoginConf != null) {
-         System.setProperty(java_security_auth_login_config, oldLoginConf);
-      }
-   }
-
-   public Socket createSocket(String host, int port) throws Exception {
-      System.setProperty("javax.net.ssl.trustStore", certBase + "/broker1.ks");
-      System.setProperty("javax.net.ssl.trustStorePassword", "password");
-      System.setProperty("javax.net.ssl.trustStoreType", "jks");
-      System.setProperty("javax.net.ssl.keyStore", certBase + "/client.ks");
-      System.setProperty("javax.net.ssl.keyStorePassword", "password");
-      System.setProperty("javax.net.ssl.keyStoreType", "jks");
-
-      SocketFactory factory = SSLSocketFactory.getDefault();
-      return factory.createSocket(host, port);
-   }
-
-   public void stompConnectTo(String connectorName, String extraHeaders) throws Exception {
-      String host = broker.getConnectorByName(connectorName).getConnectUri().getHost();
-      int port = broker.getConnectorByName(connectorName).getConnectUri().getPort();
-      stompConnection.open(createSocket(host, port));
-      String extra = extraHeaders != null ? extraHeaders : "\n";
-      stompConnection.sendFrame("CONNECT\n" + extra + "\n" + Stomp.NULL);
-
-      StompFrame f = stompConnection.receive();
-      TestCase.assertEquals(f.getBody(), "CONNECTED", f.getAction());
-      stompConnection.close();
-   }
-
-   @Test
-   public void testStompSSLWithUsernameAndPassword() throws Exception {
-      stompConnectTo("stomp+ssl", "login:system\n" + "passcode:manager\n");
-   }
-
-   @Test
-   public void testStompSSLWithCertificate() throws Exception {
-      stompConnectTo("stomp+ssl", null);
-   }
-
-   @Test
-   public void testStompNIOSSLWithUsernameAndPassword() throws Exception {
-      stompConnectTo("stomp+nio+ssl", "login:system\n" + "passcode:manager\n");
-   }
-
-   @Test
-   public void testStompNIOSSLWithCertificate() throws Exception {
-      stompConnectTo("stomp+nio+ssl", null);
-   }
-
-   public void openwireConnectTo(String connectorName, String username, String password) throws Exception {
-      URI brokerURI = broker.getConnectorByName(connectorName).getConnectUri();
-      String uri = "ssl://" + brokerURI.getHost() + ":" + brokerURI.getPort();
-      ActiveMQSslConnectionFactory cf = new ActiveMQSslConnectionFactory(uri);
-      cf.setTrustStore("org/apache/activemq/security/broker1.ks");
-      cf.setTrustStorePassword("password");
-      cf.setKeyStore("org/apache/activemq/security/client.ks");
-      cf.setKeyStorePassword("password");
-      ActiveMQConnection connection = null;
-      if (username != null || password != null) {
-         connection = (ActiveMQConnection) cf.createConnection(username, password);
-      }
-      else {
-         connection = (ActiveMQConnection) cf.createConnection();
-      }
-      TestCase.assertNotNull(connection);
-      connection.start();
-      connection.stop();
-   }
-
-   @Test
-   public void testOpenwireSSLWithUsernameAndPassword() throws Exception {
-      openwireConnectTo("openwire+ssl", "system", "manager");
-   }
-
-   @Test
-   public void testOpenwireSSLWithCertificate() throws Exception {
-      openwireConnectTo("openwire+ssl", null, null);
-   }
-
-   @Test
-   public void testOpenwireNIOSSLWithUsernameAndPassword() throws Exception {
-      openwireConnectTo("openwire+nio+ssl", "system", "mmanager");
-   }
-
-   @Test
-   public void testOpenwireNIOSSLWithCertificate() throws Exception {
-      openwireConnectTo("openwire+nio+ssl", null, null);
-   }
-
-   @Test
-   public void testJmx() throws Exception {
-      TestCase.assertFalse(findDestination(destinationName));
-      broker.getAdminView().addQueue(destinationName);
-      TestCase.assertTrue(findDestination(destinationName));
-      broker.getAdminView().removeQueue(destinationName);
-      TestCase.assertFalse(findDestination(destinationName));
-   }
-
-   private boolean findDestination(String name) throws Exception {
-      ObjectName[] destinations = broker.getAdminView().getQueues();
-      for (ObjectName destination : destinations) {
-         if (destination.toString().contains(name)) {
-            return true;
-         }
-      }
-      return false;
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4133Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4133Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4133Test.java
deleted file mode 100644
index 123413f..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4133Test.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.net.Socket;
-
-import javax.net.SocketFactory;
-import javax.net.ssl.SSLSocketFactory;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.broker.BrokerFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.transport.stomp.Stomp;
-import org.apache.activemq.transport.stomp.StompConnection;
-import org.apache.activemq.transport.stomp.StompFrame;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class AMQ4133Test {
-
-   protected String java_security_auth_login_config = "java.security.auth.login.config";
-   protected String xbean = "xbean:";
-   protected String confBase = "src/test/resources/org/apache/activemq/bugs/amq4126";
-   protected String certBase = "src/test/resources/org/apache/activemq/security";
-   protected String activemqXml = "InconsistentConnectorPropertiesBehaviour.xml";
-   protected BrokerService broker;
-
-   protected String oldLoginConf = null;
-
-   @Before
-   public void before() throws Exception {
-      if (System.getProperty(java_security_auth_login_config) != null) {
-         oldLoginConf = System.getProperty(java_security_auth_login_config);
-      }
-      System.setProperty(java_security_auth_login_config, confBase + "/" + "login.config");
-      broker = BrokerFactory.createBroker(xbean + confBase + "/" + activemqXml);
-
-      broker.start();
-      broker.waitUntilStarted();
-   }
-
-   @After
-   public void after() throws Exception {
-      if (broker != null) {
-         broker.stop();
-         broker.waitUntilStopped();
-      }
-   }
-
-   @Test
-   public void stompSSLTransportNeedClientAuthTrue() throws Exception {
-      stompConnectTo("localhost", broker.getConnectorByName("stomp+ssl").getConnectUri().getPort());
-   }
-
-   @Test
-   public void stompSSLNeedClientAuthTrue() throws Exception {
-      stompConnectTo("localhost", broker.getConnectorByName("stomp+ssl+special").getConnectUri().getPort());
-   }
-
-   @Test
-   public void stompNIOSSLTransportNeedClientAuthTrue() throws Exception {
-      stompConnectTo("localhost", broker.getConnectorByName("stomp+nio+ssl").getConnectUri().getPort());
-   }
-
-   @Test
-   public void stompNIOSSLNeedClientAuthTrue() throws Exception {
-      stompConnectTo("localhost", broker.getConnectorByName("stomp+nio+ssl+special").getConnectUri().getPort());
-   }
-
-   public Socket createSocket(String host, int port) throws Exception {
-      System.setProperty("javax.net.ssl.trustStore", certBase + "/" + "broker1.ks");
-      System.setProperty("javax.net.ssl.trustStorePassword", "password");
-      System.setProperty("javax.net.ssl.trustStoreType", "jks");
-      System.setProperty("javax.net.ssl.keyStore", certBase + "/" + "client.ks");
-      System.setProperty("javax.net.ssl.keyStorePassword", "password");
-      System.setProperty("javax.net.ssl.keyStoreType", "jks");
-
-      SocketFactory factory = SSLSocketFactory.getDefault();
-      return factory.createSocket(host, port);
-   }
-
-   public void stompConnectTo(String host, int port) throws Exception {
-      StompConnection stompConnection = new StompConnection();
-      stompConnection.open(createSocket(host, port));
-      stompConnection.sendFrame("CONNECT\n" + "\n" + Stomp.NULL);
-      StompFrame f = stompConnection.receive();
-      TestCase.assertEquals(f.getBody(), "CONNECTED", f.getAction());
-      stompConnection.close();
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4147Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4147Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4147Test.java
deleted file mode 100644
index d0096f1..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4147Test.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.bugs;
-
-import java.net.URI;
-import java.util.concurrent.Semaphore;
-
-import javax.jms.Message;
-import javax.jms.MessageListener;
-
-import org.apache.activemq.JmsMultipleBrokersTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.network.DemandForwardingBridgeSupport;
-import org.apache.activemq.util.MessageIdList;
-import org.apache.activemq.util.Wait;
-
-/**
- * This test demonstrates a bug in {@link DemandForwardingBridgeSupport} when
- * bridges are VM-to-VM. Specifically, memory usage from the local broker is
- * manipulated by the remote broker.
- */
-public class AMQ4147Test extends JmsMultipleBrokersTestSupport {
-
-   /**
-    * This test demonstrates the bug: namely, when a message is bridged over
-    * the VMTransport, its memory usage continues to refer to the originating
-    * broker. As a result, memory usage is never accounted for on the remote
-    * broker, and the local broker's memory usage is only decreased once the
-    * message is consumed on the remote broker.
-    */
-   public void testVMTransportRemoteMemoryUsage() throws Exception {
-      BrokerService broker1 = createBroker(new URI("broker:(vm://broker1)/broker1?persistent=false"));
-
-      BrokerService broker2 = createBroker(new URI("broker:(vm://broker2)/broker2?persistent=false"));
-
-      startAllBrokers();
-
-      // Forward messages from broker1 to broker2 over the VM transport.
-      bridgeBrokers("broker1", "broker2").start();
-
-      // Verify that broker1 and broker2's test queues have no memory usage.
-      ActiveMQDestination testQueue = createDestination(AMQ4147Test.class.getSimpleName() + ".queue", false);
-      final Destination broker1TestQueue = broker1.getDestination(testQueue);
-      final Destination broker2TestQueue = broker2.getDestination(testQueue);
-
-      assertEquals(0, broker1TestQueue.getMemoryUsage().getUsage());
-      assertEquals(0, broker2TestQueue.getMemoryUsage().getUsage());
-
-      // Produce a message to broker1's test queue and verify that broker1's
-      // memory usage has increased, but broker2 still has no memory usage.
-      sendMessages("broker1", testQueue, 1);
-      assertTrue(broker1TestQueue.getMemoryUsage().getUsage() > 0);
-      assertEquals(0, broker2TestQueue.getMemoryUsage().getUsage());
-
-      // Create a consumer on broker2 that is synchronized to allow detection
-      // of "in flight" messages to the consumer.
-      MessageIdList broker2Messages = getBrokerMessages("broker2");
-      final Semaphore consumerReady = new Semaphore(0);
-      final Semaphore consumerProceed = new Semaphore(0);
-
-      broker2Messages.setParent(new MessageListener() {
-         @Override
-         public void onMessage(Message message) {
-            consumerReady.release();
-            try {
-               consumerProceed.acquire();
-            }
-            catch (InterruptedException ex) {
-               Thread.currentThread().interrupt();
-            }
-         }
-      });
-
-      createConsumer("broker2", testQueue);
-
-      // Verify that when broker2's consumer receives the message, the memory
-      // usage has moved broker1 to broker2. The first assertion is expected
-      // to fail due to the bug; the try/finally ensures the consumer is
-      // released prior to failure so that the broker can shut down.
-      consumerReady.acquire();
-
-      try {
-         assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-               return broker1TestQueue.getMemoryUsage().getUsage() == 0;
-            }
-         }));
-         assertTrue(broker2TestQueue.getMemoryUsage().getUsage() > 0);
-      }
-      finally {
-         // Consume the message and verify that there is no more memory
-         // usage.
-         consumerProceed.release();
-      }
-
-      assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return broker1TestQueue.getMemoryUsage().getUsage() == 0;
-         }
-      }));
-      assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return broker2TestQueue.getMemoryUsage().getUsage() == 0;
-         }
-      }));
-   }
-
-   /**
-    * This test demonstrates that the bug is VMTransport-specific and does not
-    * occur when bridges occur using other protocols.
-    */
-   public void testTcpTransportRemoteMemoryUsage() throws Exception {
-      BrokerService broker1 = createBroker(new URI("broker:(vm://broker1)/broker1?persistent=false"));
-
-      BrokerService broker2 = createBroker(new URI("broker:(tcp://localhost:61616)/broker2?persistent=false"));
-
-      startAllBrokers();
-
-      // Forward messages from broker1 to broker2 over the TCP transport.
-      bridgeBrokers("broker1", "broker2").start();
-
-      // Verify that broker1 and broker2's test queues have no memory usage.
-      ActiveMQDestination testQueue = createDestination(AMQ4147Test.class.getSimpleName() + ".queue", false);
-      final Destination broker1TestQueue = broker1.getDestination(testQueue);
-      final Destination broker2TestQueue = broker2.getDestination(testQueue);
-
-      assertEquals(0, broker1TestQueue.getMemoryUsage().getUsage());
-      assertEquals(0, broker2TestQueue.getMemoryUsage().getUsage());
-
-      // Produce a message to broker1's test queue and verify that broker1's
-      // memory usage has increased, but broker2 still has no memory usage.
-      sendMessages("broker1", testQueue, 1);
-      assertTrue(broker1TestQueue.getMemoryUsage().getUsage() > 0);
-      assertEquals(0, broker2TestQueue.getMemoryUsage().getUsage());
-
-      // Create a consumer on broker2 that is synchronized to allow detection
-      // of "in flight" messages to the consumer.
-      MessageIdList broker2Messages = getBrokerMessages("broker2");
-      final Semaphore consumerReady = new Semaphore(0);
-      final Semaphore consumerProceed = new Semaphore(0);
-
-      broker2Messages.setParent(new MessageListener() {
-         @Override
-         public void onMessage(Message message) {
-            consumerReady.release();
-            try {
-               consumerProceed.acquire();
-            }
-            catch (InterruptedException ex) {
-               Thread.currentThread().interrupt();
-            }
-         }
-      });
-
-      createConsumer("broker2", testQueue);
-
-      // Verify that when broker2's consumer receives the message, the memory
-      // usage has moved broker1 to broker2.
-      consumerReady.acquire();
-
-      try {
-         assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-               return broker1TestQueue.getMemoryUsage().getUsage() == 0;
-            }
-         }));
-         assertTrue(broker2TestQueue.getMemoryUsage().getUsage() > 0);
-      }
-      finally {
-         // Consume the message and verify that there is no more memory
-         // usage.
-         consumerProceed.release();
-      }
-
-      // Pause to allow ACK to be processed.
-      assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return broker1TestQueue.getMemoryUsage().getUsage() == 0;
-         }
-      }));
-      assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return broker2TestQueue.getMemoryUsage().getUsage() == 0;
-         }
-      }));
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4148Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4148Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4148Test.java
deleted file mode 100644
index 8558f48..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4148Test.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.bugs;
-
-import java.net.URI;
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.activemq.JmsMultipleBrokersTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.network.DemandForwardingBridgeSupport;
-import org.apache.activemq.network.NetworkConnector;
-import org.apache.activemq.util.Wait;
-import org.junit.Assert;
-
-/**
- * This test demonstrates a bug in {@link DemandForwardingBridgeSupport} whereby
- * a static subscription from broker1 to broker2 is forwarded to broker3 even
- * though the network TTL is 1. This results in duplicate subscriptions on
- * broker3.
- */
-public class AMQ4148Test extends JmsMultipleBrokersTestSupport {
-
-   public void test() throws Exception {
-      // Create a hub-and-spoke network where each hub-spoke pair share
-      // messages on a test queue.
-      BrokerService hub = createBroker(new URI("broker:(vm://hub)/hub?persistent=false"));
-
-      final BrokerService[] spokes = new BrokerService[4];
-      for (int i = 0; i < spokes.length; i++) {
-         spokes[i] = createBroker(new URI("broker:(vm://spoke" + i + ")/spoke" + i + "?persistent=false"));
-
-      }
-      startAllBrokers();
-
-      ActiveMQDestination testQueue = createDestination(AMQ4148Test.class.getSimpleName() + ".queue", false);
-
-      NetworkConnector[] ncs = new NetworkConnector[spokes.length];
-      for (int i = 0; i < spokes.length; i++) {
-         NetworkConnector nc = bridgeBrokers("hub", "spoke" + i);
-         nc.setNetworkTTL(1);
-         nc.setDuplex(true);
-         nc.setConduitSubscriptions(false);
-         nc.setStaticallyIncludedDestinations(Arrays.asList(testQueue));
-         nc.start();
-
-         ncs[i] = nc;
-      }
-
-      waitForBridgeFormation();
-
-      // Pause to allow subscriptions to be created.
-      TimeUnit.SECONDS.sleep(5);
-
-      // Verify that the hub has a subscription from each spoke, but that each
-      // spoke has a single subscription from the hub (since the network TTL is 1).
-      final Destination hubTestQueue = hub.getDestination(testQueue);
-      assertTrue("Expecting {" + spokes.length + "} consumer but was {" + hubTestQueue.getConsumers().size() + "}", Wait.waitFor(new Wait.Condition() {
-
-                    @Override
-                    public boolean isSatisified() throws Exception {
-                       return spokes.length == hubTestQueue.getConsumers().size();
-                    }
-                 }));
-
-      // Now check each spoke has exactly one consumer on the Queue.
-      for (int i = 0; i < 4; i++) {
-         Destination spokeTestQueue = spokes[i].getDestination(testQueue);
-         Assert.assertEquals(1, spokeTestQueue.getConsumers().size());
-      }
-
-      for (NetworkConnector nc : ncs) {
-         nc.stop();
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4157Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4157Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4157Test.java
deleted file mode 100644
index f932a49..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4157Test.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Vector;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.BytesMessage;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ConnectionControl;
-import org.junit.After;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ4157Test {
-
-   static final Logger LOG = LoggerFactory.getLogger(AMQ4157Test.class);
-   private BrokerService broker;
-   private ActiveMQConnectionFactory connectionFactory;
-   private final Destination destination = new ActiveMQQueue("Test");
-   private final String payloadString = new String(new byte[8 * 1024]);
-   private final boolean useBytesMessage = true;
-   private final int parallelProducer = 20;
-   private final int parallelConsumer = 100;
-
-   private final Vector<Exception> exceptions = new Vector<>();
-   long toSend = 1000;
-
-   @Test
-   public void testPublishCountsWithRollbackConsumer() throws Exception {
-
-      startBroker(true);
-
-      final AtomicLong sharedCount = new AtomicLong(toSend);
-      ExecutorService executorService = Executors.newCachedThreadPool();
-
-      for (int i = 0; i < parallelConsumer; i++) {
-         executorService.execute(new Runnable() {
-            @Override
-            public void run() {
-               try {
-                  consumeOneAndRollback();
-               }
-               catch (Exception e) {
-                  exceptions.add(e);
-               }
-            }
-         });
-      }
-
-      for (int i = 0; i < parallelProducer; i++) {
-         executorService.execute(new Runnable() {
-            @Override
-            public void run() {
-               try {
-                  publishMessages(sharedCount, 0);
-               }
-               catch (Exception e) {
-                  exceptions.add(e);
-               }
-            }
-         });
-      }
-
-      executorService.shutdown();
-      executorService.awaitTermination(30, TimeUnit.MINUTES);
-      assertTrue("Producers done in time", executorService.isTerminated());
-      assertTrue("No exceptions: " + exceptions, exceptions.isEmpty());
-
-      restartBroker(500);
-
-      LOG.info("Attempting consume of {} messages", toSend);
-
-      consumeMessages(toSend);
-   }
-
-   private void consumeOneAndRollback() throws Exception {
-      ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
-      connection.start();
-      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-      MessageConsumer consumer = session.createConsumer(destination);
-      Message message = null;
-      while (message == null) {
-         message = consumer.receive(1000);
-      }
-      session.rollback();
-      connection.close();
-   }
-
-   private void consumeMessages(long count) throws Exception {
-      ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
-      connection.start();
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageConsumer consumer = session.createConsumer(destination);
-      for (int i = 0; i < count; i++) {
-         assertNotNull("got message " + i, consumer.receive(20000));
-      }
-      assertNull("none left over", consumer.receive(2000));
-   }
-
-   private void restartBroker(int restartDelay) throws Exception {
-      stopBroker();
-      TimeUnit.MILLISECONDS.sleep(restartDelay);
-      startBroker(false);
-   }
-
-   @After
-   public void stopBroker() throws Exception {
-      if (broker != null) {
-         broker.stop();
-         broker.waitUntilStopped();
-      }
-   }
-
-   private void publishMessages(AtomicLong count, int expiry) throws Exception {
-      ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
-      connection.setWatchTopicAdvisories(false);
-      connection.start();
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-      MessageProducer producer = session.createProducer(destination);
-      while ((count.getAndDecrement()) > 0) {
-         Message message = null;
-         if (useBytesMessage) {
-            message = session.createBytesMessage();
-            ((BytesMessage) message).writeBytes(payloadString.getBytes());
-         }
-         else {
-            message = session.createTextMessage(payloadString);
-         }
-         producer.send(message, DeliveryMode.PERSISTENT, 5, expiry);
-      }
-      connection.syncSendPacket(new ConnectionControl());
-      connection.close();
-   }
-
-   public void startBroker(boolean deleteAllMessages) throws Exception {
-      broker = new BrokerService();
-      broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
-      broker.addConnector("tcp://0.0.0.0:0");
-      broker.start();
-
-      String options = "?jms.redeliveryPolicy.maximumRedeliveries=-1&jms.prefetchPolicy.all=1000&jms.watchTopicAdvisories=false&jms.useAsyncSend=true&jms.alwaysSessionAsync=false&jms.dispatchAsync=false&socketBufferSize=131072&ioBufferSize=16384&wireFormat.tightEncodingEnabled=false&wireFormat.cacheSize=8192";
-      connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri() + options);
-   }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java
deleted file mode 100644
index 0cd8e0b..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.bugs;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.management.ObjectName;
-
-import org.apache.activemq.JmsMultipleBrokersTestSupport;
-import org.apache.activemq.broker.Broker;
-import org.apache.activemq.broker.BrokerFilter;
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.DiscoveryEvent;
-import org.apache.activemq.network.DiscoveryNetworkConnector;
-import org.apache.activemq.network.NetworkBridge;
-import org.apache.activemq.network.NetworkBridgeListener;
-import org.apache.activemq.network.NetworkConnector;
-import org.apache.activemq.thread.TaskRunnerFactory;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.discovery.DiscoveryAgent;
-import org.apache.activemq.transport.discovery.DiscoveryListener;
-import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent;
-import org.junit.Assert;
-
-/**
- * This test demonstrates a number of race conditions in
- * {@link DiscoveryNetworkConnector} that can result in an active bridge no
- * longer being reported as active and vice-versa, an inactive bridge still
- * being reported as active.
- */
-public class AMQ4160Test extends JmsMultipleBrokersTestSupport {
-
-   final long MAX_TEST_TIME = TimeUnit.MINUTES.toMillis(2);
-
-   /**
-    * Since these tests involve wait conditions, protect against indefinite
-    * waits (due to unanticipated issues).
-    */
-   @Override
-   public void setUp() throws Exception {
-      setAutoFail(true);
-      setMaxTestTime(MAX_TEST_TIME);
-      super.setUp();
-   }
-
-   /**
-    * This test demonstrates how concurrent attempts to establish a bridge to
-    * the same remote broker are allowed to occur. Connection uniqueness will
-    * cause whichever bridge creation attempt is second to fail. However, this
-    * failure erases the entry in
-    * {@link DiscoveryNetworkConnector#activeBridges()} that represents the
-    * successful first bridge creation attempt.
-    */
-   public void testLostActiveBridge() throws Exception {
-      final long ATTEMPT_TO_CREATE_DELAY = TimeUnit.SECONDS.toMillis(15);
-
-      // Start two brokers with a bridge from broker1 to broker2.
-      BrokerService broker1 = createBroker(new URI("broker:(vm://broker1)/broker1?persistent=false"));
-      final BrokerService broker2 = createBroker(new URI("broker:(vm://broker2)/broker2?persistent=false"));
-
-      // Allow the concurrent local bridge connections to be made even though
-      // they are duplicated; this prevents both of the bridge attempts from
-      // failing in the case that the local and remote bridges are established
-      // out-of-order.
-      BrokerPlugin ignoreAddConnectionPlugin = new BrokerPlugin() {
-         @Override
-         public Broker installPlugin(Broker broker) throws Exception {
-            return new BrokerFilter(broker) {
-               @Override
-               public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
-                  // ignore
-               }
-            };
-         }
-      };
-
-      broker1.setPlugins(new BrokerPlugin[]{ignoreAddConnectionPlugin});
-
-      startAllBrokers();
-
-      // Start a bridge from broker1 to broker2. The discovery agent attempts
-      // to create the bridge concurrently with two threads, and the
-      // synchronization in createBridge ensures that pre-patch both threads
-      // actually attempt to start bridges. Post-patch, only one thread is
-      // allowed to start the bridge.
-      final CountDownLatch attemptLatch = new CountDownLatch(2);
-      final CountDownLatch createLatch = new CountDownLatch(2);
-
-      DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector() {
-         @Override
-         public void onServiceAdd(DiscoveryEvent event) {
-            // Pre-and-post patch, two threads attempt to establish a bridge
-            // to the same remote broker.
-            attemptLatch.countDown();
-            super.onServiceAdd(event);
-         }
-
-         @Override
-         protected NetworkBridge createBridge(Transport localTransport,
-                                              Transport remoteTransport,
-                                              final DiscoveryEvent event) {
-            // Pre-patch, the two threads are allowed to create the bridge.
-            // Post-patch, only the first thread is allowed. Wait a
-            // reasonable delay once both attempts are detected to allow
-            // the two bridge creations to occur concurrently (pre-patch).
-            // Post-patch, the wait will timeout and allow the first (and
-            // only) bridge creation to occur.
-            try {
-               attemptLatch.await();
-               createLatch.countDown();
-               createLatch.await(ATTEMPT_TO_CREATE_DELAY, TimeUnit.MILLISECONDS);
-               return super.createBridge(localTransport, remoteTransport, event);
-            }
-            catch (InterruptedException e) {
-               Thread.interrupted();
-               return null;
-            }
-         }
-      };
-
-      nc.setDiscoveryAgent(new DiscoveryAgent() {
-         TaskRunnerFactory taskRunner = new TaskRunnerFactory();
-         DiscoveryListener listener;
-
-         @Override
-         public void start() throws Exception {
-            taskRunner.init();
-            taskRunner.execute(new Runnable() {
-               @Override
-               public void run() {
-                  listener.onServiceAdd(new DiscoveryEvent(broker2.getVmConnectorURI().toString()));
-               }
-            });
-            taskRunner.execute(new Runnable() {
-               @Override
-               public void run() {
-                  listener.onServiceAdd(new DiscoveryEvent(broker2.getVmConnectorURI().toString()));
-               }
-            });
-         }
-
-         @Override
-         public void stop() throws Exception {
-            taskRunner.shutdown();
-         }
-
-         @Override
-         public void setDiscoveryListener(DiscoveryListener listener) {
-            this.listener = listener;
-         }
-
-         @Override
-         public void registerService(String name) throws IOException {
-         }
-
-         @Override
-         public void serviceFailed(DiscoveryEvent event) throws IOException {
-            listener.onServiceRemove(event);
-         }
-      });
-
-      broker1.addNetworkConnector(nc);
-      nc.start();
-
-      // Wait for the bridge to be formed by the first attempt.
-      waitForBridge(broker1.getBrokerName(), broker2.getBrokerName(), MAX_TEST_TIME, TimeUnit.MILLISECONDS);
-
-      // Pre-patch, the second bridge creation attempt fails and removes the
-      // first (successful) bridge creation attempt from the
-      // list of active bridges. Post-patch, the second bridge creation
-      // attempt is prevented, so the first bridge creation attempt
-      // remains "active". This assertion is expected to fail pre-patch and
-      // pass post-patch.
-      Assert.assertFalse(nc.activeBridges().isEmpty());
-   }
-
-   /**
-    * This test demonstrates a race condition where a failed bridge can be
-    * removed from the list of active bridges in
-    * {@link DiscoveryNetworkConnector} before it has been added. Eventually,
-    * the failed bridge is added, but never removed, which causes subsequent
-    * bridge creation attempts to be ignored. The result is a network connector
-    * that thinks it has an active bridge, when in fact it doesn't.
-    */
-   public void testInactiveBridgStillActive() throws Exception {
-      // Start two brokers with a bridge from broker1 to broker2.
-      BrokerService broker1 = createBroker(new URI("broker:(vm://broker1)/broker1?persistent=false"));
-      final BrokerService broker2 = createBroker(new URI("broker:(vm://broker2)/broker2?persistent=false"));
-
-      // Force bridge failure by having broker1 disallow connections.
-      BrokerPlugin disallowAddConnectionPlugin = new BrokerPlugin() {
-         @Override
-         public Broker installPlugin(Broker broker) throws Exception {
-            return new BrokerFilter(broker) {
-               @Override
-               public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
-                  throw new Exception("Test exception to force bridge failure");
-               }
-            };
-         }
-      };
-
-      broker1.setPlugins(new BrokerPlugin[]{disallowAddConnectionPlugin});
-
-      startAllBrokers();
-
-      // Start a bridge from broker1 to broker2. The bridge delays returning
-      // from start until after the bridge failure has been processed;
-      // this leaves the first bridge creation attempt recorded as active,
-      // even though it failed.
-      final SimpleDiscoveryAgent da = new SimpleDiscoveryAgent();
-      da.setServices(new URI[]{broker2.getVmConnectorURI()});
-
-      final CountDownLatch attemptLatch = new CountDownLatch(3);
-      final CountDownLatch removedLatch = new CountDownLatch(1);
-
-      DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector() {
-         @Override
-         public void onServiceAdd(DiscoveryEvent event) {
-            attemptLatch.countDown();
-            super.onServiceAdd(event);
-         }
-
-         @Override
-         public void onServiceRemove(DiscoveryEvent event) {
-            super.onServiceRemove(event);
-            removedLatch.countDown();
-         }
-
-         @Override
-         protected NetworkBridge createBridge(Transport localTransport,
-                                              Transport remoteTransport,
-                                              final DiscoveryEvent event) {
-            final NetworkBridge next = super.createBridge(localTransport, remoteTransport, event);
-            return new NetworkBridge() {
-
-               @Override
-               public void start() throws Exception {
-                  next.start();
-                  // Delay returning until the failed service has been
-                  // removed.
-                  removedLatch.await();
-               }
-
-               @Override
-               public void stop() throws Exception {
-                  next.stop();
-               }
-
-               @Override
-               public void serviceRemoteException(Throwable error) {
-                  next.serviceRemoteException(error);
-               }
-
-               @Override
-               public void serviceLocalException(Throwable error) {
-                  next.serviceLocalException(error);
-               }
-
-               @Override
-               public void setNetworkBridgeListener(NetworkBridgeListener listener) {
-                  next.setNetworkBridgeListener(listener);
-               }
-
-               @Override
-               public String getRemoteAddress() {
-                  return next.getRemoteAddress();
-               }
-
-               @Override
-               public String getRemoteBrokerName() {
-                  return next.getRemoteBrokerName();
-               }
-
-               @Override
-               public String getRemoteBrokerId() {
-                  return next.getRemoteBrokerId();
-               }
-
-               @Override
-               public String getLocalAddress() {
-                  return next.getLocalAddress();
-               }
-
-               @Override
-               public String getLocalBrokerName() {
-                  return next.getLocalBrokerName();
-               }
-
-               @Override
-               public long getEnqueueCounter() {
-                  return next.getEnqueueCounter();
-               }
-
-               @Override
-               public long getDequeueCounter() {
-                  return next.getDequeueCounter();
-               }
-
-               @Override
-               public void setMbeanObjectName(ObjectName objectName) {
-                  next.setMbeanObjectName(objectName);
-               }
-
-               @Override
-               public ObjectName getMbeanObjectName() {
-                  return next.getMbeanObjectName();
-               }
-
-               @Override
-               public void resetStats() {
-                  next.resetStats();
-               }
-            };
-         }
-      };
-      nc.setDiscoveryAgent(da);
-
-      broker1.addNetworkConnector(nc);
-      nc.start();
-
-      // All bridge attempts should fail, so the attempt latch should get
-      // triggered. However, because of the race condition, the first attempt
-      // is considered successful and causes further attempts to stop.
-      // Therefore, this wait will time out and cause the test to fail.
-      Assert.assertTrue(attemptLatch.await(30, TimeUnit.SECONDS));
-   }
-
-   /**
-    * This test verifies that when a network connector is restarted, any
-    * bridges that were active at the time of the stop are allowed to be
-    * re-established (i.e., the "active events" data structure in
-    * {@link DiscoveryNetworkConnector} is reset.
-    */
-   public void testAllowAttemptsAfterRestart() throws Exception {
-      final long STOP_DELAY = TimeUnit.SECONDS.toMillis(10);
-
-      // Start two brokers with a bridge from broker1 to broker2.
-      BrokerService broker1 = createBroker(new URI("broker:(vm://broker1)/broker1?persistent=false"));
-      final BrokerService broker2 = createBroker(new URI("broker:(vm://broker2)/broker2?persistent=false"));
-
-      startAllBrokers();
-
-      // Start a bridge from broker1 to broker2.
-      NetworkConnector nc = bridgeBrokers(broker1.getBrokerName(), broker2.getBrokerName());
-      nc.start();
-
-      waitForBridge(broker1.getBrokerName(), broker2.getBrokerName(), MAX_TEST_TIME, TimeUnit.MILLISECONDS);
-
-      // Restart the network connector and verify that the bridge is
-      // re-established. The pause between start/stop is to account for the
-      // asynchronous closure.
-      nc.stop();
-      Thread.sleep(STOP_DELAY);
-      nc.start();
-
-      waitForBridge(broker1.getBrokerName(), broker2.getBrokerName(), MAX_TEST_TIME, TimeUnit.MILLISECONDS);
-   }
-}