You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2015/03/06 23:30:42 UTC

[09/15] activemq-6 git commit: Refactored the testsuite a bit

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/jms/bridge/JMSBridgeTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/jms/bridge/JMSBridgeTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/jms/bridge/JMSBridgeTest.java
new file mode 100644
index 0000000..de916f3
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/jms/bridge/JMSBridgeTest.java
@@ -0,0 +1,2449 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.extras.jms.bridge;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.activemq.api.jms.ActiveMQJMSConstants;
+import org.apache.activemq.jms.bridge.ConnectionFactoryFactory;
+import org.apache.activemq.jms.bridge.QualityOfServiceMode;
+import org.apache.activemq.jms.bridge.impl.JMSBridgeImpl;
+import org.apache.activemq.jms.client.ActiveMQMessage;
+import org.apache.activemq.service.extensions.ServiceUtils;
+import org.apache.activemq.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.tests.integration.ra.DummyTransactionManager;
+import org.apache.activemq.utils.DefaultSensitiveStringCodec;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * A JMSBridgeTest
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ */
+public class JMSBridgeTest extends BridgeTestBase
+{
+   private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
+
+   // MaxBatchSize but no MaxBatchTime
+
+   @Test
+   public void testNoMaxBatchTime_AtMostOnce_P() throws Exception
+   {
+      testNoMaxBatchTime(QualityOfServiceMode.AT_MOST_ONCE, true);
+   }
+
+   @Test
+   public void testNoMaxBatchTime_DuplicatesOk_P() throws Exception
+   {
+      testNoMaxBatchTime(QualityOfServiceMode.DUPLICATES_OK, true);
+   }
+
+   @Test
+   public void testNoMaxBatchTime_OnceAndOnlyOnce_P() throws Exception
+   {
+      testNoMaxBatchTime(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true);
+   }
+
+   @Test
+   public void testNoMaxBatchTime_AtMostOnce_NP() throws Exception
+   {
+      testNoMaxBatchTime(QualityOfServiceMode.AT_MOST_ONCE, false);
+   }
+
+   @Test
+   public void testNoMaxBatchTime_DuplicatesOk_NP() throws Exception
+   {
+      testNoMaxBatchTime(QualityOfServiceMode.DUPLICATES_OK, false);
+   }
+
+   @Test
+   public void testNoMaxBatchTime_OnceAndOnlyOnce_NP() throws Exception
+   {
+      testNoMaxBatchTime(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, false);
+   }
+
+   // Same server
+
+   // MaxBatchSize but no MaxBatchTime
+
+   @Test
+   public void testNoMaxBatchTimeSameServer_AtMostOnce_P() throws Exception
+   {
+      testNoMaxBatchTimeSameServer(QualityOfServiceMode.AT_MOST_ONCE, true);
+   }
+
+   @Test
+   public void testNoMaxBatchTimeSameServer_DuplicatesOk_P() throws Exception
+   {
+      testNoMaxBatchTimeSameServer(QualityOfServiceMode.DUPLICATES_OK, true);
+   }
+
+   @Test
+   public void testNoMaxBatchTimeSameServer_OnceAndOnlyOnce_P() throws Exception
+   {
+      testNoMaxBatchTimeSameServer(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true);
+   }
+
+   @Test
+   public void testNoMaxBatchTimeSameServer_AtMostOnce_NP() throws Exception
+   {
+      testNoMaxBatchTimeSameServer(QualityOfServiceMode.AT_MOST_ONCE, false);
+   }
+
+   @Test
+   public void testNoMaxBatchTimeSameServer_DuplicatesOk_NP() throws Exception
+   {
+      testNoMaxBatchTimeSameServer(QualityOfServiceMode.DUPLICATES_OK, false);
+   }
+
+   @Test
+   public void testNoMaxBatchTimeSameServer_OnceAndOnlyOnce_NP() throws Exception
+   {
+      testNoMaxBatchTimeSameServer(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, false);
+   }
+
+   // MaxBatchTime but no MaxBatchSize
+
+   @Test
+   public void testMaxBatchTime_AtMostOnce_P() throws Exception
+   {
+      testMaxBatchTime(QualityOfServiceMode.AT_MOST_ONCE, true);
+   }
+
+   @Test
+   public void testMaxBatchTime_DuplicatesOk_P() throws Exception
+   {
+      testMaxBatchTime(QualityOfServiceMode.DUPLICATES_OK, true);
+   }
+
+   @Test
+   public void testMaxBatchTime_OnceAndOnlyOnce_P() throws Exception
+   {
+      testMaxBatchTime(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true);
+   }
+
+   @Test
+   public void testMaxBatchTime_AtMostOnce_NP() throws Exception
+   {
+      testMaxBatchTime(QualityOfServiceMode.AT_MOST_ONCE, false);
+   }
+
+   @Test
+   public void testMaxBatchTime_DuplicatesOk_NP() throws Exception
+   {
+      testMaxBatchTime(QualityOfServiceMode.DUPLICATES_OK, false);
+   }
+
+   @Test
+   public void testMaxBatchTime_OnceAndOnlyOnce_NP() throws Exception
+   {
+      testMaxBatchTime(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, false);
+   }
+
+   // Same server
+
+   // MaxBatchTime but no MaxBatchSize
+
+   @Test
+   public void testMaxBatchTimeSameServer_AtMostOnce_P() throws Exception
+   {
+      testMaxBatchTimeSameServer(QualityOfServiceMode.AT_MOST_ONCE, true);
+   }
+
+   @Test
+   public void testMaxBatchTimeSameServer_DuplicatesOk_P() throws Exception
+   {
+      testMaxBatchTimeSameServer(QualityOfServiceMode.DUPLICATES_OK, true);
+   }
+
+   @Test
+   public void testMaxBatchTimeSameServer_OnceAndOnlyOnce_P() throws Exception
+   {
+      testMaxBatchTimeSameServer(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true);
+   }
+
+   @Test
+   public void testMaxBatchTimeSameServer_AtMostOnce_NP() throws Exception
+   {
+      testMaxBatchTimeSameServer(QualityOfServiceMode.AT_MOST_ONCE, false);
+   }
+
+   @Test
+   public void testMaxBatchTimeSameServer_DuplicatesOk_NP() throws Exception
+   {
+      testMaxBatchTimeSameServer(QualityOfServiceMode.DUPLICATES_OK, false);
+   }
+
+   @Test
+   public void testMaxBatchTimeSameServer_OnceAndOnlyOnce_NP() throws Exception
+   {
+      testMaxBatchTimeSameServer(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, false);
+   }
+
+   // Stress with batch size of 50
+
+   @Test
+   public void testStress_AtMostOnce_P_50() throws Exception
+   {
+      testStress(QualityOfServiceMode.AT_MOST_ONCE, true, 50);
+   }
+
+   @Test
+   public void testStress_DuplicatesOk_P_50() throws Exception
+   {
+      testStress(QualityOfServiceMode.DUPLICATES_OK, true, 50);
+   }
+
+   @Test
+   public void testStress_OnceAndOnlyOnce_P_50() throws Exception
+   {
+      testStress(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true, 50);
+   }
+
+   @Test
+   public void testStress_AtMostOnce_NP_50() throws Exception
+   {
+      testStress(QualityOfServiceMode.AT_MOST_ONCE, false, 50);
+   }
+
+   @Test
+   public void testStress_DuplicatesOk_NP_50() throws Exception
+   {
+      testStress(QualityOfServiceMode.DUPLICATES_OK, false, 50);
+   }
+
+   @Test
+   public void testStress_OnceAndOnlyOnce_NP_50() throws Exception
+   {
+      testStress(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, false, 50);
+   }
+
+   // Stress with batch size of 1
+
+   @Test
+   public void testStress_AtMostOnce_P_1() throws Exception
+   {
+      testStress(QualityOfServiceMode.AT_MOST_ONCE, true, 1);
+   }
+
+   @Test
+   public void testStress_DuplicatesOk_P_1() throws Exception
+   {
+      testStress(QualityOfServiceMode.DUPLICATES_OK, true, 1);
+   }
+
+   @Test
+   public void testStress_OnceAndOnlyOnce_P_1() throws Exception
+   {
+      testStress(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true, 1);
+   }
+
+   @Test
+   public void testStress_AtMostOnce_NP_1() throws Exception
+   {
+      testStress(QualityOfServiceMode.AT_MOST_ONCE, false, 1);
+   }
+
+   @Test
+   public void testStress_DuplicatesOk_NP_1() throws Exception
+   {
+      testStress(QualityOfServiceMode.DUPLICATES_OK, false, 1);
+   }
+
+   @Test
+   public void testStress_OnceAndOnlyOnce_NP_1() throws Exception
+   {
+      testStress(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, false, 1);
+   }
+
+   // Max batch time
+
+   @Test
+   public void testStressMaxBatchTime_OnceAndOnlyOnce_NP() throws Exception
+   {
+      testStressBatchTime(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, false, 200);
+   }
+
+   @Test
+   public void testStressMaxBatchTime_OnceAndOnlyOnce_P() throws Exception
+   {
+      testStressBatchTime(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true, 200);
+   }
+
+   // Stress on same server
+
+   // Stress with batch size of 50
+
+   @Test
+   public void testStressSameServer_AtMostOnce_P_50() throws Exception
+   {
+      testStressSameServer(QualityOfServiceMode.AT_MOST_ONCE, true, 50);
+   }
+
+   @Test
+   public void testStressSameServer_DuplicatesOk_P_50() throws Exception
+   {
+      testStressSameServer(QualityOfServiceMode.DUPLICATES_OK, true, 50);
+   }
+
+   @Test
+   public void testStressSameServer_OnceAndOnlyOnce_P_50() throws Exception
+   {
+      testStress(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true, 50);
+   }
+
+   @Test
+   public void testStressSameServer_AtMostOnce_NP_50() throws Exception
+   {
+      testStressSameServer(QualityOfServiceMode.AT_MOST_ONCE, false, 50);
+   }
+
+   @Test
+   public void testStressSameServer_DuplicatesOk_NP_50() throws Exception
+   {
+      testStressSameServer(QualityOfServiceMode.DUPLICATES_OK, false, 50);
+   }
+
+   @Test
+   public void testStressSameServer_OnceAndOnlyOnce_NP_50() throws Exception
+   {
+      testStressSameServer(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, false, 50);
+   }
+
+   // Stress with batch size of 1
+
+   @Test
+   public void testStressSameServer_AtMostOnce_P_1() throws Exception
+   {
+      testStressSameServer(QualityOfServiceMode.AT_MOST_ONCE, true, 1);
+   }
+
+   @Test
+   public void testStressSameServer_DuplicatesOk_P_1() throws Exception
+   {
+      testStressSameServer(QualityOfServiceMode.DUPLICATES_OK, true, 1);
+   }
+
+   @Test
+   public void testStressSameServer_OnceAndOnlyOnce_P_1() throws Exception
+   {
+      testStressSameServer(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true, 1);
+   }
+
+   @Test
+   public void testStressSameServer_AtMostOnce_NP_1() throws Exception
+   {
+      testStressSameServer(QualityOfServiceMode.AT_MOST_ONCE, false, 1);
+   }
+
+   @Test
+   public void testStressSameServer_DuplicatesOk_NP_1() throws Exception
+   {
+      testStressSameServer(QualityOfServiceMode.DUPLICATES_OK, false, 1);
+   }
+
+   @Test
+   public void testStressSameServer_OnceAndOnlyOnce_NP_1() throws Exception
+   {
+      testStressSameServer(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, false, 1);
+   }
+
+   @Test
+   public void testStartBridgeFirst() throws Exception
+   {
+      //stop the source server, we want to start the bridge first
+      jmsServer0.stop();
+      JMSBridgeImpl bridge = null;
+
+      ConnectionFactoryFactory factInUse0 = cff0;
+      ConnectionFactoryFactory factInUse1 = cff1;
+      try
+      {
+         final int NUM_MESSAGES = 10;
+
+         bridge = new JMSBridgeImpl(factInUse0,
+                                    factInUse1,
+                                    sourceQueueFactory,
+                                    targetQueueFactory,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    5000,
+                                    10,
+                                    QualityOfServiceMode.AT_MOST_ONCE,
+                                    NUM_MESSAGES,
+                                    -1,
+                                    null,
+                                    null,
+                                    false);
+
+         bridge.start();
+
+         //now start the server
+         jmsServer0.start();
+         createQueue("sourceQueue", 0);
+         createQueue("localTargetQueue", 0);
+         jmsServer0.createTopic(false, "sourceTopic", "/topic/sourceTopic");
+         // Send half the messages
+
+         sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES / 2, false, false);
+
+         // Verify none are received
+
+         checkEmpty(targetQueue, 1);
+
+         // Send the other half
+
+         sendMessages(cf0, sourceQueue, NUM_MESSAGES / 2, NUM_MESSAGES / 2, false, false);
+
+         // This should now be receivable
+
+         checkAllMessageReceivedInOrder(cf1, targetQueue, 0, NUM_MESSAGES, false);
+
+         // Send another batch with one more than batch size
+
+         sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES + 1, false, false);
+
+         // Make sure only batch size are received
+
+         checkAllMessageReceivedInOrder(cf1, targetQueue, 0, NUM_MESSAGES, false);
+
+         // Final batch
+
+         sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES - 1, false, false);
+
+         checkAllMessageReceivedInOrder(cf1, targetQueue, NUM_MESSAGES, 1, false);
+
+         checkAllMessageReceivedInOrder(cf1, targetQueue, 0, NUM_MESSAGES - 1, false);
+
+      }
+      finally
+      {
+         if (bridge != null)
+         {
+            JMSBridgeTest.log.info("Stopping bridge");
+            bridge.stop();
+         }
+      }
+   }
+
+   @Test
+   public void testParams() throws Exception
+   {
+      JMSBridgeImpl bridge = null;
+
+      QualityOfServiceMode qosMode = QualityOfServiceMode.AT_MOST_ONCE;
+
+      int batchSize = 10;
+
+      int maxBatchTime = -1;
+
+      String sourceUsername = null;
+
+      String sourcePassword = null;
+
+      String destUsername = null;
+
+      String destPassword = null;
+
+      String selector = null;
+
+      long failureRetryInterval = 5000;
+
+      int maxRetries = 10;
+
+      String subName = null;
+
+      String clientID = null;
+
+      try
+      {
+         bridge =
+            new JMSBridgeImpl(null,
+                              cff1,
+                              sourceQueueFactory,
+                              targetQueueFactory,
+                              sourceUsername,
+                              sourcePassword,
+                              destUsername,
+                              destPassword,
+                              selector,
+                              failureRetryInterval,
+                              maxRetries,
+                              qosMode,
+                              batchSize,
+                              maxBatchTime,
+                              subName,
+                              clientID,
+                              false);
+         fail("expected exception");
+      }
+      catch (IllegalArgumentException e)
+      {
+         // Ok
+      }
+      finally
+      {
+         stopComponent(bridge);
+      }
+
+      try
+      {
+         bridge = new JMSBridgeImpl(cff0,
+                                    null,
+                                    sourceQueueFactory,
+                                    targetQueueFactory,
+                                    sourceUsername,
+                                    sourcePassword,
+                                    destUsername,
+                                    destPassword,
+                                    selector,
+                                    failureRetryInterval,
+                                    maxRetries,
+                                    qosMode,
+                                    batchSize,
+                                    maxBatchTime,
+                                    subName,
+                                    clientID,
+                                    false);
+         fail("expected exception");
+      }
+      catch (IllegalArgumentException e)
+      {
+         // Ok
+      }
+      finally
+      {
+         stopComponent(bridge);
+      }
+
+      try
+      {
+         bridge = new JMSBridgeImpl(cff0,
+                                    cff1,
+                                    null,
+                                    targetQueueFactory,
+                                    sourceUsername,
+                                    sourcePassword,
+                                    destUsername,
+                                    destPassword,
+                                    selector,
+                                    failureRetryInterval,
+                                    maxRetries,
+                                    qosMode,
+                                    batchSize,
+                                    maxBatchTime,
+                                    subName,
+                                    clientID,
+                                    false);
+         fail("expected exception");
+      }
+      catch (IllegalArgumentException e)
+      {
+         // Ok
+      }
+      finally
+      {
+         stopComponent(bridge);
+      }
+
+      try
+      {
+         bridge = new JMSBridgeImpl(cff0,
+                                    cff1,
+                                    sourceQueueFactory,
+                                    null,
+                                    sourceUsername,
+                                    sourcePassword,
+                                    destUsername,
+                                    destPassword,
+                                    selector,
+                                    failureRetryInterval,
+                                    maxRetries,
+                                    qosMode,
+                                    batchSize,
+                                    maxBatchTime,
+                                    subName,
+                                    clientID,
+                                    false);
+         fail("expected exception");
+      }
+      catch (IllegalArgumentException e)
+      {
+         // Ok
+      }
+      finally
+      {
+         stopComponent(bridge);
+      }
+
+      try
+      {
+         bridge = new JMSBridgeImpl(cff0,
+                                    cff1,
+                                    sourceQueueFactory,
+                                    targetQueueFactory,
+                                    sourceUsername,
+                                    sourcePassword,
+                                    destUsername,
+                                    destPassword,
+                                    selector,
+                                    -2,
+                                    maxRetries,
+                                    qosMode,
+                                    batchSize,
+                                    maxBatchTime,
+                                    subName,
+                                    clientID,
+                                    false);
+         fail("expected exception");
+      }
+      catch (IllegalArgumentException e)
+      {
+         // Ok
+      }
+      finally
+      {
+         stopComponent(bridge);
+      }
+
+      try
+      {
+         bridge = new JMSBridgeImpl(cff0,
+                                    cff1,
+                                    sourceQueueFactory,
+                                    targetQueueFactory,
+                                    sourceUsername,
+                                    sourcePassword,
+                                    destUsername,
+                                    destPassword,
+                                    selector,
+                                    -1,
+                                    10,
+                                    qosMode,
+                                    batchSize,
+                                    maxBatchTime,
+                                    subName,
+                                    clientID,
+                                    false);
+         fail("expected exception");
+      }
+      catch (IllegalArgumentException e)
+      {
+         // Ok
+      }
+      finally
+      {
+         stopComponent(bridge);
+      }
+
+      try
+      {
+         bridge = new JMSBridgeImpl(cff0,
+                                    cff1,
+                                    sourceQueueFactory,
+                                    null,
+                                    sourceUsername,
+                                    sourcePassword,
+                                    destUsername,
+                                    destPassword,
+                                    selector,
+                                    failureRetryInterval,
+                                    maxRetries,
+                                    qosMode,
+                                    0,
+                                    maxBatchTime,
+                                    subName,
+                                    clientID,
+                                    false);
+         fail("expected exception");
+      }
+      catch (IllegalArgumentException e)
+      {
+         // Ok
+      }
+      finally
+      {
+         stopComponent(bridge);
+      }
+
+      try
+      {
+         bridge = new JMSBridgeImpl(cff0,
+                                    cff1,
+                                    sourceQueueFactory,
+                                    null,
+                                    sourceUsername,
+                                    sourcePassword,
+                                    destUsername,
+                                    destPassword,
+                                    selector,
+                                    failureRetryInterval,
+                                    maxRetries,
+                                    qosMode,
+                                    batchSize,
+                                    -2,
+                                    subName,
+                                    clientID,
+                                    false);
+         fail("expected exception");
+      }
+      catch (IllegalArgumentException e)
+      {
+         // Ok
+      }
+      finally
+      {
+         stopComponent(bridge);
+      }
+   }
+
+   @Test
+   public void testStartStopStart() throws Exception
+   {
+      JMSBridgeImpl bridge = null;
+
+      Connection connSource = null;
+
+      Connection connTarget = null;
+
+      try
+      {
+         final int NUM_MESSAGES = 10;
+
+         bridge = new JMSBridgeImpl(cff0,
+                                    cff1,
+                                    sourceQueueFactory,
+                                    targetQueueFactory,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    5000,
+                                    10,
+                                    QualityOfServiceMode.AT_MOST_ONCE,
+                                    1,
+                                    -1,
+                                    null,
+                                    null,
+                                    false);
+
+         bridge.start();
+
+         bridge.stop();
+
+         bridge.start();
+
+         connSource = cf0.createConnection();
+
+         Session sessSend = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer prod = sessSend.createProducer(sourceQueue);
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sessSend.createTextMessage("message" + i);
+            prod.send(tm);
+         }
+
+         connTarget = cf1.createConnection();
+         Session sessRec = connTarget.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer cons = sessRec.createConsumer(targetQueue);
+
+         connTarget.start();
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage) cons.receive(10000);
+            Assert.assertNotNull(tm);
+            Assert.assertEquals("message" + i, tm.getText());
+         }
+
+         Message m = cons.receiveNoWait();
+         Assert.assertNull(m);
+      }
+      finally
+      {
+         if (connSource != null)
+         {
+            connSource.close();
+         }
+
+         if (connTarget != null)
+         {
+            connTarget.close();
+         }
+
+         if (bridge != null)
+         {
+            bridge.stop();
+         }
+
+         removeAllMessages(sourceQueue.getQueueName(), 0);
+      }
+   }
+
+   @Test
+   public void testSelector() throws Exception
+   {
+      JMSBridgeImpl bridge = null;
+
+      Connection connSource = null;
+
+      Connection connTarget = null;
+
+      try
+      {
+         final int NUM_MESSAGES = 10;
+
+         String selector = "vegetable='radish'";
+
+         bridge = new JMSBridgeImpl(cff0,
+                                    cff1,
+                                    sourceQueueFactory,
+                                    targetQueueFactory,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    selector,
+                                    5000,
+                                    10,
+                                    QualityOfServiceMode.AT_MOST_ONCE,
+                                    1,
+                                    -1,
+                                    null,
+                                    null,
+                                    false);
+
+         bridge.start();
+
+         connSource = cf0.createConnection();
+
+         Session sessSend = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer prod = sessSend.createProducer(sourceQueue);
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sessSend.createTextMessage("message" + i);
+
+            if (i >= NUM_MESSAGES / 2)
+            {
+               tm.setStringProperty("vegetable", "radish");
+            }
+            else
+            {
+               tm.setStringProperty("vegetable", "cauliflower");
+            }
+
+            prod.send(tm);
+         }
+
+         connTarget = cf1.createConnection();
+
+         Session sessRec = connTarget.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageConsumer cons = sessRec.createConsumer(targetQueue);
+
+         connTarget.start();
+
+         for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage) cons.receive(10000);
+
+            Assert.assertNotNull(tm);
+
+            Assert.assertEquals("message" + i, tm.getText());
+         }
+
+         Message m = cons.receiveNoWait();
+
+         Assert.assertNull(m);
+
+      }
+      finally
+      {
+         if (connSource != null)
+         {
+            connSource.close();
+         }
+
+         if (connTarget != null)
+         {
+            connTarget.close();
+         }
+
+         if (bridge != null)
+         {
+            bridge.stop();
+         }
+
+         removeAllMessages(sourceQueue.getQueueName(), 0);
+      }
+   }
+
+   @Test
+   public void testMaskPassword() throws Exception
+   {
+      JMSBridgeImpl bridge = null;
+
+      Connection connSource = null;
+
+      Connection connTarget = null;
+
+      DefaultSensitiveStringCodec codec = new DefaultSensitiveStringCodec();
+      String mask = (String) codec.encode("guest");
+
+      try
+      {
+         final int NUM_MESSAGES = 10;
+
+         bridge = new JMSBridgeImpl(cff0, cff1, sourceQueueFactory,
+                                    targetQueueFactory, "guest", mask, "guest", mask, null, 5000,
+                                    10, QualityOfServiceMode.AT_MOST_ONCE, 1, -1, null, null, false);
+
+         bridge.setUseMaskedPassword(true);
+
+         bridge.start();
+
+         connSource = cf0.createConnection();
+
+         Session sessSend = connSource.createSession(false,
+                                                     Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer prod = sessSend.createProducer(sourceQueue);
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sessSend.createTextMessage("message" + i);
+
+            prod.send(tm);
+         }
+
+         connTarget = cf1.createConnection();
+
+         Session sessRec = connTarget.createSession(false,
+                                                    Session.AUTO_ACKNOWLEDGE);
+
+         MessageConsumer cons = sessRec.createConsumer(targetQueue);
+
+         connTarget.start();
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage) cons.receive(10000);
+
+            Assert.assertNotNull(tm);
+
+            Assert.assertEquals("message" + i, tm.getText());
+         }
+
+         Message m = cons.receiveNoWait();
+
+         Assert.assertNull(m);
+
+      }
+      finally
+      {
+         if (connSource != null)
+         {
+            connSource.close();
+         }
+
+         if (connTarget != null)
+         {
+            connTarget.close();
+         }
+
+         if (bridge != null)
+         {
+            bridge.stop();
+         }
+
+         removeAllMessages(sourceQueue.getQueueName(), 0);
+      }
+   }
+
+   @Test
+   public void testPasswordCodec() throws Exception
+   {
+      JMSBridgeImpl bridge = null;
+
+      Connection connSource = null;
+
+      Connection connTarget = null;
+
+      DefaultSensitiveStringCodec codec = new DefaultSensitiveStringCodec();
+      Map<String, String> prop = new HashMap<String, String>();
+      prop.put("key", "bridgekey");
+      codec.init(prop);
+
+      String mask = (String) codec.encode("guest");
+
+      try
+      {
+         final int NUM_MESSAGES = 10;
+
+         bridge = new JMSBridgeImpl(cff0, cff1, sourceQueueFactory,
+                                    targetQueueFactory, "guest", mask, "guest", mask, null, 5000,
+                                    10, QualityOfServiceMode.AT_MOST_ONCE, 1, -1, null, null, false);
+
+         bridge.setUseMaskedPassword(true);
+         bridge.setPasswordCodec(codec.getClass().getName() + ";key=bridgekey");
+
+         bridge.start();
+
+         connSource = cf0.createConnection();
+
+         Session sessSend = connSource.createSession(false,
+                                                     Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer prod = sessSend.createProducer(sourceQueue);
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sessSend.createTextMessage("message" + i);
+
+            prod.send(tm);
+         }
+
+         connTarget = cf1.createConnection();
+
+         Session sessRec = connTarget.createSession(false,
+                                                    Session.AUTO_ACKNOWLEDGE);
+
+         MessageConsumer cons = sessRec.createConsumer(targetQueue);
+
+         connTarget.start();
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage) cons.receive(10000);
+
+            Assert.assertNotNull(tm);
+
+            Assert.assertEquals("message" + i, tm.getText());
+         }
+
+         Message m = cons.receiveNoWait();
+
+         Assert.assertNull(m);
+
+      }
+      finally
+      {
+         if (connSource != null)
+         {
+            connSource.close();
+         }
+
+         if (connTarget != null)
+         {
+            connTarget.close();
+         }
+
+         if (bridge != null)
+         {
+            bridge.stop();
+         }
+
+         removeAllMessages(sourceQueue.getQueueName(), 0);
+      }
+   }
+
+   @Test
+   public void testStartBridgeWithJTATransactionAlreadyRunningLargeMessage() throws Exception
+   {
+      internalTestStartBridgeWithJTATransactionAlreadyRunning(true);
+   }
+
+   @Test
+   public void testStartBridgeWithJTATransactionAlreadyRunningRegularMessage() throws Exception
+   {
+      internalTestStartBridgeWithJTATransactionAlreadyRunning(false);
+   }
+
+   public void internalTestStartBridgeWithJTATransactionAlreadyRunning(final boolean largeMessage) throws Exception
+   {
+      JMSBridgeImpl bridge = null;
+
+      Transaction toResume = null;
+
+      Transaction started = null;
+
+      TransactionManager mgr = newTransactionManager();
+
+      try
+      {
+
+         toResume = mgr.suspend();
+
+         mgr.begin();
+
+         started = mgr.getTransaction();
+
+         final int NUM_MESSAGES = 10;
+
+         bridge = new JMSBridgeImpl(cff0,
+                                    cff1,
+                                    sourceTopicFactory,
+                                    targetQueueFactory,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    5000,
+                                    10,
+                                    QualityOfServiceMode.AT_MOST_ONCE,
+                                    1,
+                                    -1,
+                                    null,
+                                    null,
+                                    false);
+         bridge.start();
+
+         sendMessages(cf0, sourceTopic, 0, NUM_MESSAGES, false, largeMessage);
+
+         checkAllMessageReceivedInOrder(cf1, targetQueue, 0, NUM_MESSAGES, largeMessage);
+      }
+      finally
+      {
+         if (started != null)
+         {
+            try
+            {
+               started.rollback();
+            }
+            catch (Exception e)
+            {
+               JMSBridgeTest.log.error("Failed to rollback", e);
+            }
+         }
+
+         if (toResume != null)
+         {
+            try
+            {
+               mgr.resume(toResume);
+            }
+            catch (Exception e)
+            {
+               JMSBridgeTest.log.error("Failed to resume", e);
+            }
+         }
+         if (bridge != null)
+         {
+            bridge.stop();
+         }
+      }
+   }
+
+   @Test
+   public void testNonDurableSubscriberLargeMessage() throws Exception
+   {
+      internalTestNonDurableSubscriber(true, 1);
+   }
+
+   @Test
+   public void testNonDurableSubscriberRegularMessage() throws Exception
+   {
+      internalTestNonDurableSubscriber(false, 1);
+   }
+
+   public void internalTestNonDurableSubscriber(final boolean largeMessage, final int batchSize) throws Exception
+   {
+      JMSBridgeImpl bridge = null;
+
+      try
+      {
+         final int NUM_MESSAGES = 10;
+
+         bridge = new JMSBridgeImpl(cff0,
+                                    cff1,
+                                    sourceTopicFactory,
+                                    targetQueueFactory,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    5000,
+                                    10,
+                                    QualityOfServiceMode.AT_MOST_ONCE,
+                                    batchSize,
+                                    -1,
+                                    null,
+                                    null,
+                                    false);
+
+         bridge.start();
+
+         sendMessages(cf0, sourceTopic, 0, NUM_MESSAGES, false, largeMessage);
+
+         checkAllMessageReceivedInOrder(cf1, targetQueue, 0, NUM_MESSAGES, largeMessage);
+      }
+      finally
+      {
+         if (bridge != null)
+         {
+            bridge.stop();
+         }
+      }
+   }
+
+   @Test
+   public void testDurableSubscriberLargeMessage() throws Exception
+   {
+      internalTestDurableSubscriber(true, 1);
+   }
+
+   @Test
+   public void testDurableSubscriberRegularMessage() throws Exception
+   {
+      internalTestDurableSubscriber(false, 1);
+   }
+
+   public void internalTestDurableSubscriber(final boolean largeMessage, final int batchSize) throws Exception
+   {
+      JMSBridgeImpl bridge = null;
+
+      try
+      {
+         final int NUM_MESSAGES = 10;
+
+         bridge = new JMSBridgeImpl(cff0,
+                                    cff1,
+                                    sourceTopicFactory,
+                                    targetQueueFactory,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    5000,
+                                    10,
+                                    QualityOfServiceMode.AT_MOST_ONCE,
+                                    batchSize,
+                                    -1,
+                                    "subTest",
+                                    "clientid123",
+                                    false);
+
+         bridge.start();
+
+         sendMessages(cf0, sourceTopic, 0, NUM_MESSAGES, true, largeMessage);
+
+         checkAllMessageReceivedInOrder(cf1, targetQueue, 0, NUM_MESSAGES, largeMessage);
+      }
+      finally
+      {
+         if (bridge != null)
+         {
+            bridge.stop();
+         }
+
+         // Now unsubscribe
+         Connection conn = cf0.createConnection();
+         conn.setClientID("clientid123");
+         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         sess.unsubscribe("subTest");
+         conn.close();
+      }
+   }
+
+   @Test
+   public void testMessageIDInHeaderOn() throws Exception
+   {
+      messageIDInHeader(true);
+   }
+
+   @Test
+   public void testMessageIDInHeaderOff() throws Exception
+   {
+      messageIDInHeader(false);
+   }
+
+   private void messageIDInHeader(final boolean on) throws Exception
+   {
+      JMSBridgeImpl bridge = null;
+
+      Connection connSource = null;
+
+      Connection connTarget = null;
+
+      try
+      {
+         final int NUM_MESSAGES = 10;
+
+         bridge = new JMSBridgeImpl(cff0,
+                                    cff1,
+                                    sourceQueueFactory,
+                                    targetQueueFactory,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    5000,
+                                    10,
+                                    QualityOfServiceMode.AT_MOST_ONCE,
+                                    1,
+                                    -1,
+                                    null,
+                                    null,
+                                    on);
+
+         bridge.start();
+
+         connSource = cf0.createConnection();
+
+         connTarget = cf1.createConnection();
+
+         JMSBridgeTest.log.trace("Sending " + NUM_MESSAGES + " messages");
+
+         List<String> ids1 = new ArrayList<String>();
+
+         Session sessSource = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer prod = sessSource.createProducer(sourceQueue);
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sessSource.createTextMessage("message" + i);
+
+            // We add some properties to make sure they get passed through ok
+            tm.setStringProperty("wib", "uhuh");
+            tm.setBooleanProperty("cheese", true);
+            tm.setIntProperty("Sausages", 23);
+            tm.setByteProperty("bacon", (byte) 12);
+            tm.setDoubleProperty("toast", 17261762.12121d);
+            tm.setFloatProperty("orange", 1212.1212f);
+            tm.setLongProperty("blurg", 817217827L);
+            tm.setShortProperty("stst", (short) 26363);
+
+            //Set some JMS headers too
+
+            //And also set a core props
+            ((ActiveMQMessage) tm).getCoreMessage().putBytesProperty("bytes", new byte[]{1, 2, 3});
+
+            // We add some JMSX ones too
+
+            tm.setStringProperty("JMSXGroupID", "mygroup543");
+
+            prod.send(tm);
+
+            ids1.add(tm.getJMSMessageID());
+         }
+
+         JMSBridgeTest.log.trace("Sent the first messages");
+
+         Session sessTarget = connTarget.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageConsumer cons = sessTarget.createConsumer(targetQueue);
+
+         connTarget.start();
+
+         List<TextMessage> msgs = new ArrayList<TextMessage>();
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage) cons.receive(5000);
+
+            Assert.assertNotNull(tm);
+
+            Assert.assertEquals("message" + i, tm.getText());
+
+            Assert.assertEquals("uhuh", tm.getStringProperty("wib"));
+            Assert.assertTrue(tm.getBooleanProperty("cheese"));
+            Assert.assertEquals(23, tm.getIntProperty("Sausages"));
+            assertEquals((byte) 12, tm.getByteProperty("bacon"));
+            assertEquals(17261762.12121d, tm.getDoubleProperty("toast"), 0.000000001);
+            assertEquals(1212.1212f, tm.getFloatProperty("orange"), 0.000001);
+            assertEquals(817217827L, tm.getLongProperty("blurg"));
+            assertEquals((short) 26363, tm.getShortProperty("stst"));
+
+            assertEqualsByteArrays(new byte[]{1, 2, 3}, ((ActiveMQMessage) tm).getCoreMessage().getBytesProperty("bytes"));
+
+            Assert.assertEquals("mygroup543", tm.getStringProperty("JMSXGroupID"));
+
+            if (on)
+            {
+               String header = tm.getStringProperty(ActiveMQJMSConstants.JBOSS_MESSAGING_BRIDGE_MESSAGE_ID_LIST);
+
+               Assert.assertNotNull(header);
+
+               Assert.assertEquals(ids1.get(i), header);
+
+               msgs.add(tm);
+            }
+         }
+
+         if (on)
+         {
+            // Now we send them again back to the source
+
+            Iterator<TextMessage> iter = msgs.iterator();
+
+            List<String> ids2 = new ArrayList<String>();
+
+            while (iter.hasNext())
+            {
+               Message msg = iter.next();
+
+               prod.send(msg);
+
+               ids2.add(msg.getJMSMessageID());
+            }
+
+            // And consume them again
+
+            for (int i = 0; i < NUM_MESSAGES; i++)
+            {
+               TextMessage tm = (TextMessage) cons.receive(5000);
+
+               Assert.assertNotNull(tm);
+
+               Assert.assertEquals("message" + i, tm.getText());
+
+               Assert.assertEquals("uhuh", tm.getStringProperty("wib"));
+               Assert.assertTrue(tm.getBooleanProperty("cheese"));
+               Assert.assertEquals(23, tm.getIntProperty("Sausages"));
+               assertEquals((byte) 12, tm.getByteProperty("bacon"));
+               assertEquals(17261762.12121d, tm.getDoubleProperty("toast"), 0.000001);
+               assertEquals(1212.1212f, tm.getFloatProperty("orange"), 0.0000001);
+               assertEquals(817217827L, tm.getLongProperty("blurg"));
+               assertEquals((short) 26363, tm.getShortProperty("stst"));
+
+               assertEqualsByteArrays(new byte[]{1, 2, 3}, ((ActiveMQMessage) tm).getCoreMessage().getBytesProperty("bytes"));
+
+               Assert.assertEquals("mygroup543", tm.getStringProperty("JMSXGroupID"));
+
+               String header = tm.getStringProperty(ActiveMQJMSConstants.JBOSS_MESSAGING_BRIDGE_MESSAGE_ID_LIST);
+
+               Assert.assertNotNull(header);
+
+               Assert.assertEquals(ids1.get(i) + "," + ids2.get(i), header);
+            }
+         }
+
+      }
+      finally
+      {
+         if (bridge != null)
+         {
+            bridge.stop();
+         }
+
+         if (connSource != null)
+         {
+            connSource.close();
+         }
+
+         if (connTarget != null)
+         {
+            connTarget.close();
+         }
+      }
+   }
+
+   @Test
+   public void testPropertiesPreservedPOn() throws Exception
+   {
+      propertiesPreserved(true, true);
+   }
+
+   @Test
+   public void testPropertiesPreservedNPoff() throws Exception
+   {
+      propertiesPreserved(false, true);
+   }
+
+   @Test
+   public void testPropertiesPreservedNPOn() throws Exception
+   {
+      propertiesPreserved(false, true);
+   }
+
+   @Test
+   public void testPropertiesPreservedPoff() throws Exception
+   {
+      propertiesPreserved(true, true);
+   }
+
+   private void propertiesPreserved(final boolean persistent, final boolean messageIDInHeader) throws Exception
+   {
+      JMSBridgeImpl bridge = null;
+
+      Connection connSource = null;
+
+      Connection connTarget = null;
+
+      try
+      {
+         final int NUM_MESSAGES = 10;
+
+         bridge = new JMSBridgeImpl(cff0,
+                                    cff1,
+                                    sourceQueueFactory,
+                                    targetQueueFactory,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    5000,
+                                    10,
+                                    QualityOfServiceMode.AT_MOST_ONCE,
+                                    1,
+                                    -1,
+                                    null,
+                                    null,
+                                    messageIDInHeader);
+
+         bridge.start();
+
+         connSource = cf0.createConnection();
+
+         connTarget = cf1.createConnection();
+
+         JMSBridgeTest.log.trace("Sending " + NUM_MESSAGES + " messages");
+
+         Session sessSource = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         Session sessTarget = connTarget.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageConsumer cons = sessTarget.createConsumer(targetQueue);
+
+         connTarget.start();
+
+         MessageProducer prod = sessSource.createProducer(sourceQueue);
+
+         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+         TextMessage tm = sessSource.createTextMessage("blahmessage");
+
+         prod.setPriority(7);
+
+         prod.setTimeToLive(1 * 60 * 60 * 1000);
+
+         prod.send(tm);
+
+         long expiration = tm.getJMSExpiration();
+
+         Assert.assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT,
+                             tm.getJMSDeliveryMode());
+
+         tm = (TextMessage) cons.receive(1000);
+
+         Assert.assertNotNull(tm);
+
+         Assert.assertEquals("blahmessage", tm.getText());
+
+         Assert.assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT,
+                             tm.getJMSDeliveryMode());
+
+         Assert.assertEquals(7, tm.getJMSPriority());
+
+         Assert.assertTrue(Math.abs(expiration - tm.getJMSExpiration()) < 100);
+
+         Message m = cons.receive(5000);
+
+         Assert.assertNull(m);
+
+         // Now do one with expiration = 0
+
+         tm = sessSource.createTextMessage("blahmessage2");
+
+         prod.setPriority(7);
+
+         prod.setTimeToLive(0);
+
+         prod.send(tm);
+
+         Assert.assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT,
+                             tm.getJMSDeliveryMode());
+
+         tm = (TextMessage) cons.receive(1000);
+
+         Assert.assertNotNull(tm);
+
+         Assert.assertEquals("blahmessage2", tm.getText());
+
+         Assert.assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT,
+                             tm.getJMSDeliveryMode());
+
+         Assert.assertEquals(7, tm.getJMSPriority());
+
+         Assert.assertEquals(0, tm.getJMSExpiration());
+
+         m = cons.receive(5000);
+
+         Assert.assertNull(m);
+
+         tm = sessSource.createTextMessage("blahmessage3");
+
+         final boolean myBool = false;
+         final byte myByte = (byte) 23;
+         final double myDouble = 17625765d;
+         final float myFloat = 87127.23f;
+         final int myInt = 123;
+         final long myLong = 81728712;
+         final short myShort = (short) 88;
+         final String myString = "ojweodewj";
+         final String myJMSX = "aardvark";
+
+         tm.setBooleanProperty("mybool", myBool);
+         tm.setByteProperty("mybyte", myByte);
+         tm.setDoubleProperty("mydouble", myDouble);
+         tm.setFloatProperty("myfloat", myFloat);
+         tm.setIntProperty("myint", myInt);
+         tm.setLongProperty("mylong", myLong);
+         tm.setShortProperty("myshort", myShort);
+         tm.setStringProperty("mystring", myString);
+
+         tm.setStringProperty("JMSXMyNaughtyJMSXProperty", myJMSX);
+
+         prod.send(tm);
+
+         tm = (TextMessage) cons.receive(5000);
+
+         Assert.assertNotNull(tm);
+
+         Assert.assertEquals("blahmessage3", tm.getText());
+
+         Assert.assertEquals(myBool, tm.getBooleanProperty("mybool"));
+         Assert.assertEquals(myByte, tm.getByteProperty("mybyte"));
+         Assert.assertEquals(myDouble, tm.getDoubleProperty("mydouble"), 0.000001);
+         Assert.assertEquals(myFloat, tm.getFloatProperty("myfloat"), 0.000001);
+         Assert.assertEquals(myInt, tm.getIntProperty("myint"));
+         Assert.assertEquals(myLong, tm.getLongProperty("mylong"));
+         Assert.assertEquals(myShort, tm.getShortProperty("myshort"));
+         Assert.assertEquals(myString, tm.getStringProperty("mystring"));
+         Assert.assertEquals(myJMSX, tm.getStringProperty("JMSXMyNaughtyJMSXProperty"));
+
+         m = cons.receive(5000);
+
+      }
+      finally
+      {
+         if (bridge != null)
+         {
+            bridge.stop();
+         }
+
+         if (connSource != null)
+         {
+            connSource.close();
+         }
+
+         if (connTarget != null)
+         {
+            connTarget.close();
+         }
+      }
+   }
+
+   @Test
+   public void testNoMessageIDInHeader() throws Exception
+   {
+      JMSBridgeImpl bridge = null;
+
+      Connection connSource = null;
+
+      Connection connTarget = null;
+
+      try
+      {
+         final int NUM_MESSAGES = 10;
+
+         bridge = new JMSBridgeImpl(cff0,
+                                    cff1,
+                                    sourceQueueFactory,
+                                    targetQueueFactory,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    5000,
+                                    10,
+                                    QualityOfServiceMode.AT_MOST_ONCE,
+                                    1,
+                                    -1,
+                                    null,
+                                    null,
+                                    false);
+
+         bridge.start();
+
+         connSource = cf0.createConnection();
+
+         connTarget = cf1.createConnection();
+
+         JMSBridgeTest.log.trace("Sending " + NUM_MESSAGES + " messages");
+
+         Session sessSource = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer prod = sessSource.createProducer(sourceQueue);
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sessSource.createTextMessage("message" + i);
+
+            // We add some headers to make sure they get passed through ok
+            tm.setStringProperty("wib", "uhuh");
+            tm.setBooleanProperty("cheese", true);
+            tm.setIntProperty("Sausages", 23);
+
+            prod.send(tm);
+         }
+
+         JMSBridgeTest.log.trace("Sent the first messages");
+
+         Session sessTarget = connTarget.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageConsumer cons = sessTarget.createConsumer(targetQueue);
+
+         connTarget.start();
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage) cons.receive(5000);
+
+            Assert.assertNotNull(tm);
+
+            Assert.assertEquals("message" + i, tm.getText());
+
+            Assert.assertEquals("uhuh", tm.getStringProperty("wib"));
+            Assert.assertTrue(tm.getBooleanProperty("cheese"));
+            Assert.assertEquals(23, tm.getIntProperty("Sausages"));
+
+            String header = tm.getStringProperty(ActiveMQJMSConstants.JBOSS_MESSAGING_BRIDGE_MESSAGE_ID_LIST);
+
+            Assert.assertNull(header);
+         }
+      }
+      finally
+      {
+         if (bridge != null)
+         {
+            bridge.stop();
+         }
+
+         if (connSource != null)
+         {
+            connSource.close();
+         }
+
+         if (connTarget != null)
+         {
+            connTarget.close();
+         }
+      }
+   }
+
+   // Private -------------------------------------------------------------------------------
+
+   private void testStress(final QualityOfServiceMode qosMode, final boolean persistent, final int batchSize) throws Exception
+   {
+      Connection connSource = null;
+
+      JMSBridgeImpl bridge = null;
+
+      Thread t = null;
+
+      ConnectionFactoryFactory factInUse0 = cff0;
+      ConnectionFactoryFactory factInUse1 = cff1;
+      if (qosMode.equals(QualityOfServiceMode.ONCE_AND_ONLY_ONCE))
+      {
+         factInUse0 = cff0xa;
+         factInUse1 = cff1xa;
+         ServiceUtils.setTransactionManager(newTransactionManager());
+      }
+
+      try
+      {
+         bridge = new JMSBridgeImpl(factInUse0,
+                                    factInUse1,
+                                    sourceQueueFactory,
+                                    targetQueueFactory,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    5000,
+                                    10,
+                                    qosMode,
+                                    batchSize,
+                                    -1,
+                                    null,
+                                    null,
+                                    false);
+
+         bridge.start();
+
+         connSource = cf0.createConnection();
+
+         Session sessSend = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer prod = sessSend.createProducer(sourceQueue);
+
+         final int NUM_MESSAGES = 250;
+
+         StressSender sender = new StressSender();
+         sender.sess = sessSend;
+         sender.prod = prod;
+         sender.numMessages = NUM_MESSAGES;
+         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+         t = new Thread(sender);
+
+         t.start();
+
+         checkAllMessageReceivedInOrder(cf1, targetQueue, 0, NUM_MESSAGES, false);
+
+         t.join();
+
+         if (sender.ex != null)
+         {
+            // An error occurred during the send
+            throw sender.ex;
+         }
+
+      }
+      finally
+      {
+         if (t != null)
+         {
+            t.join(10000);
+         }
+
+         if (connSource != null)
+         {
+            try
+            {
+               connSource.close();
+            }
+            catch (Exception e)
+            {
+               JMSBridgeTest.log.error("Failed to close connection", e);
+            }
+         }
+
+         if (bridge != null)
+         {
+            bridge.stop();
+         }
+      }
+   }
+
+   private void testStressBatchTime(final QualityOfServiceMode qosMode, final boolean persistent, final int maxBatchTime) throws Exception
+   {
+      Connection connSource = null;
+
+      JMSBridgeImpl bridge = null;
+
+      Thread t = null;
+
+      ConnectionFactoryFactory factInUse0 = cff0;
+      ConnectionFactoryFactory factInUse1 = cff1;
+      if (qosMode.equals(QualityOfServiceMode.ONCE_AND_ONLY_ONCE))
+      {
+         factInUse0 = cff0xa;
+         factInUse1 = cff1xa;
+         ServiceUtils.setTransactionManager(newTransactionManager());
+      }
+
+      try
+      {
+         bridge = new JMSBridgeImpl(factInUse0,
+                                    factInUse1,
+                                    sourceQueueFactory,
+                                    targetQueueFactory,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    5000,
+                                    10,
+                                    qosMode,
+                                    2,
+                                    maxBatchTime,
+                                    null,
+                                    null,
+                                    false);
+
+         bridge.start();
+
+         connSource = cf0.createConnection();
+
+         Session sessSend = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer prod = sessSend.createProducer(sourceQueue);
+
+         final int NUM_MESSAGES = 500;
+
+         StressSender sender = new StressSender();
+         sender.sess = sessSend;
+         sender.prod = prod;
+         sender.numMessages = NUM_MESSAGES;
+         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+         t = new Thread(sender);
+
+         t.start();
+
+         checkAllMessageReceivedInOrder(cf1, targetQueue, 0, NUM_MESSAGES, false);
+
+         t.join();
+
+         if (sender.ex != null)
+         {
+            // An error occurred during the send
+            throw sender.ex;
+         }
+
+      }
+      finally
+      {
+         if (t != null)
+         {
+            t.join(10000);
+         }
+
+         if (connSource != null)
+         {
+            try
+            {
+               connSource.close();
+            }
+            catch (Exception e)
+            {
+               JMSBridgeTest.log.error("Failed to close connection", e);
+            }
+         }
+
+         if (bridge != null)
+         {
+            bridge.stop();
+         }
+      }
+   }
+
+   // Both source and destination on same rm
+   private void testStressSameServer(final QualityOfServiceMode qosMode, final boolean persistent, final int batchSize) throws Exception
+   {
+      Connection connSource = null;
+
+      JMSBridgeImpl bridge = null;
+
+      Thread t = null;
+
+      ConnectionFactoryFactory factInUse0 = cff0;
+      if (qosMode.equals(QualityOfServiceMode.ONCE_AND_ONLY_ONCE))
+      {
+         factInUse0 = cff0xa;
+      }
+
+      try
+      {
+         bridge = new JMSBridgeImpl(factInUse0,
+                                    factInUse0,
+                                    sourceQueueFactory,
+                                    localTargetQueueFactory,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    5000,
+                                    10,
+                                    qosMode,
+                                    batchSize,
+                                    -1,
+                                    null,
+                                    null,
+                                    false);
+
+         bridge.start();
+
+         connSource = cf0.createConnection();
+
+         Session sessSend = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer prod = sessSend.createProducer(sourceQueue);
+
+         final int NUM_MESSAGES = 200;
+
+         StressSender sender = new StressSender();
+         sender.sess = sessSend;
+         sender.prod = prod;
+         sender.numMessages = NUM_MESSAGES;
+         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+         t = new Thread(sender);
+
+         t.start();
+
+         checkAllMessageReceivedInOrder(cf0, localTargetQueue, 0, NUM_MESSAGES, false);
+
+         t.join();
+
+         if (sender.ex != null)
+         {
+            // An error occurred during the send
+            throw sender.ex;
+         }
+
+      }
+      finally
+      {
+         if (t != null)
+         {
+            t.join(10000);
+         }
+
+         if (connSource != null)
+         {
+            try
+            {
+               connSource.close();
+            }
+            catch (Exception e)
+            {
+               JMSBridgeTest.log.error("Failed to close connection", e);
+            }
+         }
+
+         if (bridge != null)
+         {
+            bridge.stop();
+         }
+      }
+   }
+
+   private void testNoMaxBatchTime(final QualityOfServiceMode qosMode, final boolean persistent) throws Exception
+   {
+      JMSBridgeImpl bridge = null;
+
+      ConnectionFactoryFactory factInUse0 = cff0;
+      ConnectionFactoryFactory factInUse1 = cff1;
+      if (qosMode.equals(QualityOfServiceMode.ONCE_AND_ONLY_ONCE))
+      {
+         factInUse0 = cff0xa;
+         factInUse1 = cff1xa;
+         ServiceUtils.setTransactionManager(newTransactionManager());
+      }
+      try
+      {
+         final int NUM_MESSAGES = 10;
+
+         bridge = new JMSBridgeImpl(factInUse0,
+                                    factInUse1,
+                                    sourceQueueFactory,
+                                    targetQueueFactory,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    5000,
+                                    10,
+                                    qosMode,
+                                    NUM_MESSAGES,
+                                    -1,
+                                    null,
+                                    null,
+                                    false);
+
+         bridge.start();
+
+         // Send half the messages
+
+         sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES / 2, persistent, false);
+
+         // Verify none are received
+
+         checkEmpty(targetQueue, 1);
+
+         // Send the other half
+
+         sendMessages(cf0, sourceQueue, NUM_MESSAGES / 2, NUM_MESSAGES / 2, persistent, false);
+
+         // This should now be receivable
+
+         checkAllMessageReceivedInOrder(cf1, targetQueue, 0, NUM_MESSAGES, false);
+
+         // Send another batch with one more than batch size
+
+         sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES + 1, persistent, false);
+
+         // Make sure only batch size are received
+
+         checkAllMessageReceivedInOrder(cf1, targetQueue, 0, NUM_MESSAGES, false);
+
+         // Final batch
+
+         sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES - 1, persistent, false);
+
+         checkAllMessageReceivedInOrder(cf1, targetQueue, NUM_MESSAGES, 1, false);
+
+         checkAllMessageReceivedInOrder(cf1, targetQueue, 0, NUM_MESSAGES - 1, false);
+      }
+      finally
+      {
+         if (bridge != null)
+         {
+            JMSBridgeTest.log.info("Stopping bridge");
+            bridge.stop();
+         }
+      }
+   }
+
+   private void testNoMaxBatchTimeSameServer(final QualityOfServiceMode qosMode, final boolean persistent) throws Exception
+   {
+      JMSBridgeImpl bridge = null;
+
+      ConnectionFactoryFactory factInUse0 = cff0;
+      if (qosMode.equals(QualityOfServiceMode.ONCE_AND_ONLY_ONCE))
+      {
+         factInUse0 = cff0xa;
+         ServiceUtils.setTransactionManager(newTransactionManager());
+      }
+
+      try
+      {
+         final int NUM_MESSAGES = 10;
+
+         bridge = new JMSBridgeImpl(factInUse0,
+                                    factInUse0,
+                                    sourceQueueFactory,
+                                    localTargetQueueFactory,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    5000,
+                                    10,
+                                    qosMode,
+                                    NUM_MESSAGES,
+                                    -1,
+                                    null,
+                                    null,
+                                    false);
+
+         bridge.start();
+
+         sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES / 2, persistent, false);
+
+         checkEmpty(targetQueue, 1);
+
+         // Send the other half
+
+         sendMessages(cf0, sourceQueue, NUM_MESSAGES / 2, NUM_MESSAGES / 2, persistent, false);
+
+         // This should now be receivable
+
+         checkAllMessageReceivedInOrder(cf0, localTargetQueue, 0, NUM_MESSAGES, false);
+
+         checkEmpty(localTargetQueue, 0);
+
+         checkEmpty(sourceQueue, 0);
+
+         // Send another batch with one more than batch size
+
+         sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES + 1, persistent, false);
+
+         // Make sure only batch size are received
+
+         checkAllMessageReceivedInOrder(cf0, localTargetQueue, 0, NUM_MESSAGES, false);
+
+         // Final batch
+
+         sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES - 1, persistent, false);
+
+         checkAllMessageReceivedInOrder(cf0, localTargetQueue, NUM_MESSAGES, 1, false);
+
+         checkAllMessageReceivedInOrder(cf0, localTargetQueue, 0, NUM_MESSAGES - 1, false);
+      }
+      finally
+      {
+         if (bridge != null)
+         {
+            bridge.stop();
+         }
+      }
+   }
+
+   private void testMaxBatchTime(final QualityOfServiceMode qosMode, final boolean persistent) throws Exception
+   {
+      JMSBridgeImpl bridge = null;
+
+      ConnectionFactoryFactory factInUse0 = cff0;
+      ConnectionFactoryFactory factInUse1 = cff1;
+      if (qosMode.equals(QualityOfServiceMode.ONCE_AND_ONLY_ONCE))
+      {
+         factInUse0 = cff0xa;
+         factInUse1 = cff1xa;
+         ServiceUtils.setTransactionManager(newTransactionManager());
+      }
+
+      try
+      {
+         final long MAX_BATCH_TIME = 3000;
+
+         final int MAX_BATCH_SIZE = 100000; // something big so it won't reach it
+
+         bridge = new JMSBridgeImpl(factInUse0,
+                                    factInUse1,
+                                    sourceQueueFactory,
+                                    targetQueueFactory,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    3000,
+                                    10,
+                                    qosMode,
+                                    MAX_BATCH_SIZE,
+                                    MAX_BATCH_TIME,
+                                    null,
+                                    null,
+                                    false);
+
+         bridge.start();
+
+         final int NUM_MESSAGES = 10;
+
+         // Send some message
+
+         sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES, persistent, false);
+
+         // Verify none are received
+
+         checkEmpty(targetQueue, 1);
+
+         // Messages should now be receivable
+
+         checkAllMessageReceivedInOrder(cf1, targetQueue, 0, NUM_MESSAGES, false);
+      }
+      finally
+      {
+         if (bridge != null)
+         {
+            bridge.stop();
+         }
+      }
+   }
+
+   private void testMaxBatchTimeSameServer(final QualityOfServiceMode qosMode, final boolean persistent) throws Exception
+   {
+      JMSBridgeImpl bridge = null;
+
+      ConnectionFactoryFactory factInUse0 = cff0;
+      if (qosMode.equals(QualityOfServiceMode.ONCE_AND_ONLY_ONCE))
+      {
+         factInUse0 = cff0xa;
+         ServiceUtils.setTransactionManager(newTransactionManager());
+      }
+
+      try
+      {
+         final long MAX_BATCH_TIME = 3000;
+
+         final int MAX_BATCH_SIZE = 100000; // something big so it won't reach it
+
+         bridge = new JMSBridgeImpl(factInUse0,
+                                    factInUse0,
+                                    sourceQueueFactory,
+                                    localTargetQueueFactory,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    3000,
+                                    10,
+                                    qosMode,
+                                    MAX_BATCH_SIZE,
+                                    MAX_BATCH_TIME,
+                                    null,
+                                    null,
+                                    false);
+
+         bridge.start();
+
+         final int NUM_MESSAGES = 10;
+
+         // Send some message
+
+         // Send some message
+
+         sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES, persistent, false);
+
+         // Verify none are received
+
+         checkEmpty(localTargetQueue, 0);
+
+         // Messages should now be receivable
+
+         checkAllMessageReceivedInOrder(cf0, localTargetQueue, 0, NUM_MESSAGES, false);
+      }
+      finally
+      {
+         if (bridge != null)
+         {
+            bridge.stop();
+         }
+      }
+   }
+
+   @Test
+   public void testSetTMClass() throws Exception
+   {
+      TransactionManagerLocatorImpl.tm = new DummyTransactionManager();
+
+      JMSBridgeImpl bridge = null;
+      try
+      {
+         bridge = new JMSBridgeImpl(cff0,
+                                    cff0,
+                                    sourceQueueFactory,
+                                    localTargetQueueFactory,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    3000,
+                                    10,
+                                    QualityOfServiceMode.ONCE_AND_ONLY_ONCE,
+                                    10000,
+                                    3000,
+                                    null,
+                                    null,
+                                    false);
+         bridge.start();
+      }
+      finally
+      {
+         if (bridge != null)
+         {
+            bridge.stop();
+         }
+      }
+   }
+
+   @Test
+   public void testMBeanServer() throws Exception
+   {
+
+      MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
+      ObjectName objectName = new ObjectName("example.jmsbridge:service=JMSBridge");
+
+      JMSBridgeImpl bridge = new JMSBridgeImpl(cff0,
+                                               cff0,
+                                               sourceQueueFactory,
+                                               localTargetQueueFactory,
+                                               null,
+                                               null,
+                                               null,
+                                               null,
+                                               null,
+                                               5000,
+                                               10,
+                                               QualityOfServiceMode.AT_MOST_ONCE,
+                                               1,
+                                               -1,
+                                               null,
+                                               null,
+                                               false,
+                                               mbeanServer,
+                                               objectName.getCanonicalName());
+
+      Assert.assertTrue(mbeanServer.isRegistered(objectName));
+
+      bridge.destroy();
+
+      Assert.assertFalse(mbeanServer.isRegistered(objectName));
+   }
+
+   public TransactionManager getNewTm()
+   {
+      return newTransactionManager();
+   }
+
+   // Inner classes -------------------------------------------------------------------
+
+   private static class StressSender implements Runnable
+   {
+      int numMessages;
+
+      Session sess;
+
+      MessageProducer prod;
+
+      Exception ex;
+
+      public void run()
+      {
+         try
+         {
+            for (int i = 0; i < numMessages; i++)
+            {
+               TextMessage tm = sess.createTextMessage("message" + i);
+
+               prod.send(tm);
+
+               JMSBridgeTest.log.trace("Sent message " + i);
+            }
+         }
+         catch (Exception e)
+         {
+            JMSBridgeTest.log.error("Failed to send", e);
+            ex = e;
+         }
+      }
+
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/jms/bridge/TransactionManagerLocatorImpl.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/jms/bridge/TransactionManagerLocatorImpl.java b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/jms/bridge/TransactionManagerLocatorImpl.java
new file mode 100644
index 0000000..255741f
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/jms/bridge/TransactionManagerLocatorImpl.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.tests.extras.jms.bridge;
+
+import javax.transaction.TransactionManager;
+
+import org.apache.activemq.service.extensions.transactions.TransactionManagerLocator;
+
+/**
+ * @author mtaylor
+ */
+
+public class TransactionManagerLocatorImpl implements TransactionManagerLocator
+{
+   public static TransactionManager tm = null;
+
+   @Override
+   public TransactionManager getTransactionManager()
+   {
+      return tm;
+   }
+
+   public void setTransactionManager(TransactionManager transactionManager)
+   {
+      tm = transactionManager;
+   }
+}