You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2015/03/06 23:30:40 UTC

[07/15] activemq-6 git commit: Refactored the testsuite a bit

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/bridge/ClusteredBridgeTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/bridge/ClusteredBridgeTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/bridge/ClusteredBridgeTestBase.java
deleted file mode 100644
index 31a094a..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/bridge/ClusteredBridgeTestBase.java
+++ /dev/null
@@ -1,312 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.tests.integration.jms.bridge;
-
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.naming.Context;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import com.arjuna.ats.arjuna.coordinator.TransactionReaper;
-import com.arjuna.ats.arjuna.coordinator.TxControl;
-import org.apache.activemq.api.core.ActiveMQException;
-import org.apache.activemq.api.core.TransportConfiguration;
-import org.apache.activemq.api.core.client.ClientConsumer;
-import org.apache.activemq.api.core.client.ClientMessage;
-import org.apache.activemq.api.core.client.ClientProducer;
-import org.apache.activemq.api.core.client.ClientSession;
-import org.apache.activemq.api.core.client.ClientSessionFactory;
-import org.apache.activemq.api.core.client.FailoverEventListener;
-import org.apache.activemq.api.core.client.FailoverEventType;
-import org.apache.activemq.api.core.client.ActiveMQClient;
-import org.apache.activemq.api.core.client.ServerLocator;
-import org.apache.activemq.api.jms.ActiveMQJMSClient;
-import org.apache.activemq.api.jms.JMSFactoryType;
-import org.apache.activemq.core.config.Configuration;
-import org.apache.activemq.core.config.ha.ReplicaPolicyConfiguration;
-import org.apache.activemq.core.config.ha.ReplicatedPolicyConfiguration;
-import org.apache.activemq.core.registry.JndiBindingRegistry;
-import org.apache.activemq.core.remoting.impl.invm.TransportConstants;
-import org.apache.activemq.core.server.ActiveMQServer;
-import org.apache.activemq.core.server.ActiveMQServers;
-import org.apache.activemq.jms.bridge.ConnectionFactoryFactory;
-import org.apache.activemq.jms.bridge.DestinationFactory;
-import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
-import org.apache.activemq.jms.server.JMSServerManager;
-import org.apache.activemq.jms.server.impl.JMSServerManagerImpl;
-import org.apache.activemq.tests.unit.util.InVMContext;
-import org.apache.activemq.tests.util.ServiceTestBase;
-import org.junit.After;
-import org.junit.Before;
-
-/**
- * A ClusteredBridgeTestBase
- * This class serves as a base class for jms bridge tests in
- * clustered scenarios.
- *
- * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
- */
-public abstract class ClusteredBridgeTestBase extends ServiceTestBase
-{
-   private static int index = 0;
-
-   protected Map<String, ServerGroup> groups = new HashMap<String, ServerGroup>();
-
-   @Override
-   @Before
-   public void setUp() throws Exception
-   {
-      super.setUp();
-      Iterator<ServerGroup> iter = groups.values().iterator();
-      while (iter.hasNext())
-      {
-         iter.next().start();
-      }
-      TxControl.enable();
-   }
-
-   @Override
-   @After
-   public void tearDown() throws Exception
-   {
-      Iterator<ServerGroup> iter = groups.values().iterator();
-      while (iter.hasNext())
-      {
-         iter.next().stop();
-      }
-
-      TxControl.disable(true);
-
-      TransactionReaper.terminate(false);
-
-      super.tearDown();
-   }
-
-   //create a live/backup pair.
-   protected ServerGroup createServerGroup(String name) throws Exception
-   {
-      ServerGroup server = groups.get(name);
-      if (server == null)
-      {
-         server = new ServerGroup(name, groups.size());
-         server.create();
-         groups.put(name, server);
-      }
-      return server;
-   }
-
-   //each ServerGroup represents a live/backup pair
-   protected class ServerGroup
-   {
-      private static final int ID_OFFSET = 100;
-      private String name;
-      private int id;
-
-      private JMSServerManager liveNode;
-      private JMSServerManager backupNode;
-
-      private TransportConfiguration liveConnector;
-      private TransportConfiguration backupConnector;
-
-      private Context liveContext;
-
-      private ServerLocator locator;
-      private ClientSessionFactory sessionFactory;
-
-      /**
-       * @param name - name of the group
-       * @param id   - id of the live (should be < 100)
-       */
-      public ServerGroup(String name, int id)
-      {
-         this.name = name;
-         this.id = id;
-      }
-
-      public String getName()
-      {
-         return name;
-      }
-
-      public void create() throws Exception
-      {
-         Map<String, Object> params0 = new HashMap<String, Object>();
-         params0.put(TransportConstants.SERVER_ID_PROP_NAME, id);
-         liveConnector = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params0, "in-vm-live");
-
-         Map<String, Object> params = new HashMap<String, Object>();
-         params.put(TransportConstants.SERVER_ID_PROP_NAME, id + ID_OFFSET);
-         backupConnector = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params, "in-vm-backup");
-
-         //live
-         Configuration conf0 = createBasicConfig()
-            .setJournalDirectory(getJournalDir(id, false))
-            .setBindingsDirectory(getBindingsDir(id, false))
-            .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params0))
-            .addConnectorConfiguration(liveConnector.getName(), liveConnector)
-            .setHAPolicyConfiguration(new ReplicatedPolicyConfiguration())
-            .addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName()));
-
-         ActiveMQServer server0 = addServer(ActiveMQServers.newActiveMQServer(conf0, true));
-
-         liveContext = new InVMContext();
-         liveNode = new JMSServerManagerImpl(server0);
-         liveNode.setRegistry(new JndiBindingRegistry(liveContext));
-
-         //backup
-         Configuration conf = createBasicConfig()
-            .setJournalDirectory(getJournalDir(id, true))
-            .setBindingsDirectory(getBindingsDir(id, true))
-            .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params))
-            .addConnectorConfiguration(backupConnector.getName(), backupConnector)
-            .addConnectorConfiguration(liveConnector.getName(), liveConnector)
-            .setHAPolicyConfiguration(new ReplicaPolicyConfiguration())
-            .addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName()));
-
-         ActiveMQServer backup = addServer(ActiveMQServers.newActiveMQServer(conf, true));
-
-         Context context = new InVMContext();
-
-         backupNode = new JMSServerManagerImpl(backup);
-         backupNode.setRegistry(new JndiBindingRegistry(context));
-      }
-
-      public void start() throws Exception
-      {
-         liveNode.start();
-         waitForServer(liveNode.getActiveMQServer());
-         backupNode.start();
-         waitForRemoteBackupSynchronization(backupNode.getActiveMQServer());
-
-         locator = ActiveMQClient.createServerLocatorWithHA(liveConnector);
-         locator.setReconnectAttempts(-1);
-         sessionFactory = locator.createSessionFactory();
-      }
-
-      public void stop() throws Exception
-      {
-         sessionFactory.close();
-         locator.close();
-         liveNode.stop();
-         backupNode.stop();
-      }
-
-      public void createQueue(String queueName) throws Exception
-      {
-         liveNode.createQueue(true, queueName, null, true, "/queue/" + queueName);
-      }
-
-      public ConnectionFactoryFactory getConnectionFactoryFactory()
-      {
-         ConnectionFactoryFactory cff = new ConnectionFactoryFactory()
-         {
-            public ConnectionFactory createConnectionFactory() throws Exception
-            {
-               ActiveMQConnectionFactory cf = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.XA_CF,
-                                                                                              liveConnector);
-               cf.getServerLocator().setReconnectAttempts(-1);
-               return cf;
-            }
-         };
-
-         return cff;
-      }
-
-      public DestinationFactory getDestinationFactory(final String queueName)
-      {
-
-         DestinationFactory destFactory = new DestinationFactory()
-         {
-            public Destination createDestination() throws Exception
-            {
-               return (Destination) liveContext.lookup("/queue/" + queueName);
-            }
-         };
-         return destFactory;
-      }
-
-      public void sendMessages(String queueName, int num) throws ActiveMQException
-      {
-         ClientSession session = sessionFactory.createSession();
-         ClientProducer producer = session.createProducer("jms.queue." + queueName);
-         for (int i = 0; i < num; i++)
-         {
-            ClientMessage m = session.createMessage(true);
-            m.putStringProperty("bridge-message", "hello " + index);
-            index++;
-            producer.send(m);
-         }
-         session.close();
-      }
-
-      public void receiveMessages(String queueName, int num, boolean checkDup) throws ActiveMQException
-      {
-         ClientSession session = sessionFactory.createSession();
-         session.start();
-         ClientConsumer consumer = session.createConsumer("jms.queue." + queueName);
-         for (int i = 0; i < num; i++)
-         {
-            ClientMessage m = consumer.receive(30000);
-            assertNotNull("i=" + i, m);
-            assertNotNull(m.getStringProperty("bridge-message"));
-            m.acknowledge();
-         }
-
-         ClientMessage m = consumer.receive(500);
-         if (checkDup)
-         {
-            assertNull(m);
-         }
-         else
-         {
-            //drain messages
-            while (m != null)
-            {
-               m = consumer.receive(200);
-            }
-         }
-
-         session.close();
-      }
-
-      public void crashLive() throws Exception
-      {
-         final CountDownLatch latch = new CountDownLatch(1);
-         sessionFactory.addFailoverListener(new FailoverEventListener()
-         {
-
-            @Override
-            public void failoverEvent(FailoverEventType eventType)
-            {
-               if (eventType == FailoverEventType.FAILOVER_COMPLETED)
-               {
-                  latch.countDown();
-               }
-            }
-         });
-
-         liveNode.getActiveMQServer().stop();
-
-         boolean ok = latch.await(10000, TimeUnit.MILLISECONDS);
-         assertTrue(ok);
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/bridge/JMSBridgeClusteredTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/bridge/JMSBridgeClusteredTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/bridge/JMSBridgeClusteredTest.java
deleted file mode 100644
index d3baf43..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/bridge/JMSBridgeClusteredTest.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.tests.integration.jms.bridge;
-
-import javax.transaction.TransactionManager;
-
-import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple;
-import org.apache.activemq.api.core.ActiveMQException;
-import org.apache.activemq.jms.bridge.ConnectionFactoryFactory;
-import org.apache.activemq.jms.bridge.DestinationFactory;
-import org.apache.activemq.jms.bridge.QualityOfServiceMode;
-import org.apache.activemq.jms.bridge.impl.JMSBridgeImpl;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * A JMSBridgeClusteredTest
- * <p/>
- * Tests of jms bridge using HA connection factories.
- *
- * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
- */
-public class JMSBridgeClusteredTest extends ClusteredBridgeTestBase
-{
-   private ServerGroup sourceServer;
-   private ServerGroup targetServer;
-
-   private String sourceQueueName = "SourceQueue";
-   private String targetQueueName = "TargetQueue";
-
-   @Override
-   @Before
-   public void setUp() throws Exception
-   {
-      super.setUp();
-
-      sourceServer = createServerGroup("source-server");
-      targetServer = createServerGroup("target-server");
-
-      sourceServer.start();
-      targetServer.start();
-
-      sourceServer.createQueue(sourceQueueName);
-      targetServer.createQueue(targetQueueName);
-   }
-
-   @Test
-   public void testBridgeOnFailoverXA() throws Exception
-   {
-      performSourceAndTargetCrashAndFailover(QualityOfServiceMode.ONCE_AND_ONLY_ONCE);
-   }
-
-   @Test
-   public void testBridgeOnFailoverDupsOk() throws Exception
-   {
-      performSourceAndTargetCrashAndFailover(QualityOfServiceMode.DUPLICATES_OK);
-   }
-
-   @Test
-   public void testBridgeOnFailoverAtMostOnce() throws Exception
-   {
-      performSourceAndTargetCrashAndFailover(QualityOfServiceMode.AT_MOST_ONCE);
-   }
-
-   @Test
-   public void testCrashAndFailoverWithMessagesXA() throws Exception
-   {
-      performSourceAndTargetCrashAndFailoverWithMessages(QualityOfServiceMode.ONCE_AND_ONLY_ONCE);
-   }
-
-   //test messages are correctly bridged when failover happens during a batch send.
-   //first send some messages, make sure bridge doesn't send it (below batch size)
-   //then crash the live
-   //then send more messages
-   //then receive those messages, no more, no less.
-   //this test are valid for ONCE_AND_ONLY_ONCE and AT_MOST_ONCE.
-   //with DUPS_OK the test failed because some messages are delivered again
-   //after failover, which is fine as in this mode duplication is allowed.
-   public void performSourceAndTargetCrashAndFailoverWithMessages(QualityOfServiceMode mode) throws Exception
-   {
-      JMSBridgeImpl bridge = null;
-      TransactionManager txMgr = null;
-
-      try
-      {
-         ConnectionFactoryFactory sourceCFF = sourceServer.getConnectionFactoryFactory();
-         ConnectionFactoryFactory targetCFF = targetServer.getConnectionFactoryFactory();
-         DestinationFactory sourceQueueFactory = sourceServer.getDestinationFactory(sourceQueueName);
-         DestinationFactory targetQueueFactory = targetServer.getDestinationFactory(targetQueueName);
-
-         //even number
-         final int batchSize = 4;
-         bridge = new JMSBridgeImpl(sourceCFF,
-                                    targetCFF,
-                                    sourceQueueFactory,
-                                    targetQueueFactory,
-                                    null,
-                                    null,
-                                    null,
-                                    null,
-                                    null,
-                                    1000,
-                                    -1,
-                                    mode,
-                                    batchSize,
-                                    -1,
-                                    null,
-                                    null,
-                                    false);
-
-         txMgr = newTransactionManager();
-         bridge.setTransactionManager(txMgr);
-
-         //start the bridge
-         bridge.start();
-
-         System.out.println("started bridge");
-
-         final int NUM_MESSAGES = batchSize / 2;
-
-         //send some messages to source
-         sendMessages(sourceServer, sourceQueueName, NUM_MESSAGES);
-         //receive from target, no message should be received.
-         receiveMessages(targetServer, targetQueueName, 0);
-
-         //now crash target server
-         targetServer.crashLive();
-
-         //send more
-         sendMessages(sourceServer, sourceQueueName, NUM_MESSAGES);
-
-         receiveMessages(targetServer, targetQueueName, batchSize);
-
-         //send some again
-         sendMessages(sourceServer, sourceQueueName, NUM_MESSAGES);
-         //check no messages arrived.
-         receiveMessages(targetServer, targetQueueName, 0);
-         //now crash source server
-         sourceServer.crashLive();
-
-         //verify bridge still work
-         sendMessages(sourceServer, sourceQueueName, NUM_MESSAGES);
-         receiveMessages(targetServer, targetQueueName, batchSize);
-      }
-      finally
-      {
-         if (bridge != null)
-         {
-            bridge.stop();
-         }
-      }
-   }
-
-   /*
-    * Deploy a bridge, source and target queues are in
-    * separate live/backup pairs. Source and Target CF are ha.
-    * Test the bridge work when the live servers crash.
-    */
-   private void performSourceAndTargetCrashAndFailover(QualityOfServiceMode mode) throws Exception
-   {
-
-      JMSBridgeImpl bridge = null;
-      TransactionManager txMgr = null;
-
-      try
-      {
-         ConnectionFactoryFactory sourceCFF = sourceServer.getConnectionFactoryFactory();
-         ConnectionFactoryFactory targetCFF = targetServer.getConnectionFactoryFactory();
-         DestinationFactory sourceQueueFactory = sourceServer.getDestinationFactory(sourceQueueName);
-         DestinationFactory targetQueueFactory = targetServer.getDestinationFactory(targetQueueName);
-
-         bridge = new JMSBridgeImpl(sourceCFF,
-                                    targetCFF,
-                                    sourceQueueFactory,
-                                    targetQueueFactory,
-                                    null,
-                                    null,
-                                    null,
-                                    null,
-                                    null,
-                                    1000,
-                                    -1,
-                                    mode,
-                                    10,
-                                    1000,
-                                    null,
-                                    null,
-                                    false);
-
-         txMgr = newTransactionManager();
-         bridge.setTransactionManager(txMgr);
-
-         //start the bridge
-         bridge.start();
-
-         final int NUM_MESSAGES = 10;
-
-         //send some messages to source
-         sendMessages(sourceServer, sourceQueueName, NUM_MESSAGES);
-         //receive from target
-         receiveMessages(targetServer, targetQueueName, NUM_MESSAGES);
-
-         //now crash target server
-         targetServer.crashLive();
-
-         //verify bridge still works
-         sendMessages(sourceServer, sourceQueueName, NUM_MESSAGES);
-         receiveMessages(targetServer, targetQueueName, NUM_MESSAGES);
-
-         //now crash source server
-         sourceServer.crashLive();
-
-         //verify bridge still work
-         sendMessages(sourceServer, sourceQueueName, NUM_MESSAGES);
-         receiveMessages(targetServer, targetQueueName, NUM_MESSAGES, mode == QualityOfServiceMode.ONCE_AND_ONLY_ONCE);
-      }
-      finally
-      {
-         if (bridge != null)
-         {
-            bridge.stop();
-         }
-      }
-   }
-
-   private void sendMessages(ServerGroup server, String queueName, int num) throws ActiveMQException
-   {
-      server.sendMessages(queueName, num);
-   }
-
-   private void receiveMessages(ServerGroup server, String queueName, int num, boolean checkDup) throws ActiveMQException
-   {
-      try
-      {
-         server.receiveMessages(queueName, num, checkDup);
-      }
-      catch (ActiveMQException e)
-      {
-         e.printStackTrace();
-         throw e;
-      }
-   }
-
-   private void receiveMessages(ServerGroup server, String queueName, int num) throws ActiveMQException
-   {
-      try
-      {
-         server.receiveMessages(queueName, num, false);
-      }
-      catch (ActiveMQException e)
-      {
-         e.printStackTrace();
-         throw e;
-      }
-   }
-
-   protected TransactionManager newTransactionManager()
-   {
-      return new TransactionManagerImple();
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
deleted file mode 100644
index 752c8c5..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
+++ /dev/null
@@ -1,559 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.tests.integration.jms.bridge;
-
-import org.apache.activemq.api.core.TransportConfiguration;
-import org.apache.activemq.api.core.client.ClientSession;
-import org.apache.activemq.api.jms.ActiveMQJMSClient;
-import org.apache.activemq.api.jms.JMSFactoryType;
-import org.apache.activemq.jms.bridge.ConnectionFactoryFactory;
-import org.apache.activemq.jms.bridge.QualityOfServiceMode;
-import org.apache.activemq.jms.bridge.impl.JMSBridgeImpl;
-import org.apache.activemq.jms.client.ActiveMQXAConnectionFactory;
-import org.apache.activemq.tests.integration.IntegrationTestLogger;
-import org.apache.activemq.tests.integration.ra.DummyTransactionManager;
-import org.junit.Assert;
-import org.junit.Test;
-
-import javax.transaction.HeuristicMixedException;
-import javax.transaction.HeuristicRollbackException;
-import javax.transaction.RollbackException;
-import javax.transaction.Synchronization;
-import javax.transaction.SystemException;
-import javax.transaction.Transaction;
-import javax.transaction.xa.XAResource;
-
-/**
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- */
-public class JMSBridgeReconnectionTest extends BridgeTestBase
-{
-   /**
-    *
-    */
-   private static final int TIME_WAIT = 5000;
-
-   private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
-
-   // Crash and reconnect
-
-   // Once and only once
-
-   @Test
-   public void testCrashAndReconnectDestBasic_OnceAndOnlyOnce_P() throws Exception
-   {
-      performCrashAndReconnectDestBasic(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true, false);
-   }
-
-   @Test
-   public void testCrashAndReconnectDestBasic_OnceAndOnlyOnce_P_LargeMessage() throws Exception
-   {
-      performCrashAndReconnectDestBasic(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true, true);
-   }
-
-   @Test
-   public void testCrashAndReconnectDestBasic_OnceAndOnlyOnce_NP() throws Exception
-   {
-      performCrashAndReconnectDestBasic(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, false, false);
-   }
-
-   // dups ok
-
-   @Test
-   public void testCrashAndReconnectDestBasic_DuplicatesOk_P() throws Exception
-   {
-      performCrashAndReconnectDestBasic(QualityOfServiceMode.DUPLICATES_OK, true, false);
-   }
-
-   @Test
-   public void testCrashAndReconnectDestBasic_DuplicatesOk_NP() throws Exception
-   {
-      performCrashAndReconnectDestBasic(QualityOfServiceMode.DUPLICATES_OK, false, false);
-   }
-
-   // At most once
-
-   @Test
-   public void testCrashAndReconnectDestBasic_AtMostOnce_P() throws Exception
-   {
-      performCrashAndReconnectDestBasic(QualityOfServiceMode.AT_MOST_ONCE, true, false);
-   }
-
-   @Test
-   public void testCrashAndReconnectDestBasic_AtMostOnce_NP() throws Exception
-   {
-      performCrashAndReconnectDestBasic(QualityOfServiceMode.AT_MOST_ONCE, false, false);
-   }
-
-   // Crash tests specific to XA transactions
-
-   @Test
-   public void testCrashAndReconnectDestCrashBeforePrepare_P() throws Exception
-   {
-      performCrashAndReconnectDestCrashBeforePrepare(true);
-   }
-
-   @Test
-   public void testCrashAndReconnectDestCrashBeforePrepare_NP() throws Exception
-   {
-      performCrashAndReconnectDestCrashBeforePrepare(false);
-   }
-
-   // Crash before bridge is started
-
-   @Test
-   public void testRetryConnectionOnStartup() throws Exception
-   {
-      jmsServer1.stop();
-
-      JMSBridgeImpl bridge = new JMSBridgeImpl(cff0,
-                                               cff1,
-                                               sourceQueueFactory,
-                                               targetQueueFactory,
-                                               null,
-                                               null,
-                                               null,
-                                               null,
-                                               null,
-                                               1000,
-                                               -1,
-                                               QualityOfServiceMode.DUPLICATES_OK,
-                                               10,
-                                               -1,
-                                               null,
-                                               null,
-                                               false);
-      bridge.setTransactionManager(newTransactionManager());
-      addActiveMQComponent(bridge);
-      bridge.start();
-      Assert.assertFalse(bridge.isStarted());
-      Assert.assertTrue(bridge.isFailed());
-
-      // Restart the server
-      jmsServer1.start();
-
-      createQueue("targetQueue", 1);
-      setUpAdministeredObjects();
-
-      Thread.sleep(3000);
-
-      Assert.assertTrue(bridge.isStarted());
-      Assert.assertFalse(bridge.isFailed());
-   }
-
-   /**
-    * https://jira.jboss.org/jira/browse/HORNETQ-287
-    */
-   @Test
-   public void testStopBridgeWithFailureWhenStarted() throws Exception
-   {
-      jmsServer1.stop();
-
-      JMSBridgeImpl bridge = new JMSBridgeImpl(cff0,
-                                               cff1,
-                                               sourceQueueFactory,
-                                               targetQueueFactory,
-                                               null,
-                                               null,
-                                               null,
-                                               null,
-                                               null,
-                                               500,
-                                               -1,
-                                               QualityOfServiceMode.DUPLICATES_OK,
-                                               10,
-                                               -1,
-                                               null,
-                                               null,
-                                               false);
-      bridge.setTransactionManager(newTransactionManager());
-
-      bridge.start();
-      Assert.assertFalse(bridge.isStarted());
-      Assert.assertTrue(bridge.isFailed());
-
-      bridge.stop();
-      Assert.assertFalse(bridge.isStarted());
-
-      // we restart and setup the server for the test's tearDown checks
-      jmsServer1.start();
-      createQueue("targetQueue", 1);
-      setUpAdministeredObjects();
-   }
-
-   /*
-    * Send some messages
-    * Crash the destination server
-    * Bring the destination server back up
-    * Send some more messages
-    * Verify all messages are received
-    */
-   private void performCrashAndReconnectDestBasic(final QualityOfServiceMode qosMode,
-                                                  final boolean persistent,
-                                                  final boolean largeMessage) throws Exception
-   {
-      JMSBridgeImpl bridge = null;
-
-      ConnectionFactoryFactory factInUse0 = cff0;
-      ConnectionFactoryFactory factInUse1 = cff1;
-      if (qosMode.equals(QualityOfServiceMode.ONCE_AND_ONLY_ONCE))
-      {
-         factInUse0 = cff0xa;
-         factInUse1 = cff1xa;
-      }
-
-      bridge =
-         new JMSBridgeImpl(factInUse0,
-                           factInUse1,
-                           sourceQueueFactory,
-                           targetQueueFactory,
-                           null,
-                           null,
-                           null,
-                           null,
-                           null,
-                           1000,
-                           -1,
-                           qosMode,
-                           10,
-                           -1,
-                           null,
-                           null,
-                           false);
-      addActiveMQComponent(bridge);
-      bridge.setTransactionManager(newTransactionManager());
-      bridge.start();
-
-      final int NUM_MESSAGES = 10;
-
-      // Send some messages
-
-      sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES / 2, persistent, largeMessage);
-
-      // Verify none are received
-
-      checkEmpty(targetQueue, 1);
-
-      // Now crash the dest server
-
-      JMSBridgeReconnectionTest.log.info("About to crash server");
-
-      jmsServer1.stop();
-
-      // Wait a while before starting up to simulate the dest being down for a while
-      JMSBridgeReconnectionTest.log.info("Waiting 5 secs before bringing server back up");
-      Thread.sleep(TIME_WAIT);
-      JMSBridgeReconnectionTest.log.info("Done wait");
-
-      // Restart the server
-      JMSBridgeReconnectionTest.log.info("Restarting server");
-      jmsServer1.start();
-
-      // jmsServer1.createQueue(false, "targetQueue", null, true, "queue/targetQueue");
-
-      createQueue("targetQueue", 1);
-
-      setUpAdministeredObjects();
-
-      // Send some more messages
-
-      JMSBridgeReconnectionTest.log.info("Sending more messages");
-
-      sendMessages(cf0, sourceQueue, NUM_MESSAGES / 2, NUM_MESSAGES / 2, persistent, largeMessage);
-
-      JMSBridgeReconnectionTest.log.info("Sent messages");
-
-      jmsServer1.stop();
-
-      bridge.stop();
-
-      System.out.println("JMSBridgeReconnectionTest.performCrashAndReconnectDestBasic");
-   }
-
-   @Test
-   public void performCrashDestinationStopBridge() throws Exception
-   {
-      ConnectionFactoryFactory factInUse0 = cff0;
-      ConnectionFactoryFactory factInUse1 = cff1;
-      final JMSBridgeImpl bridge =
-            new JMSBridgeImpl(factInUse0,
-                  factInUse1,
-                  sourceQueueFactory,
-                  targetQueueFactory,
-                  null,
-                  null,
-                  null,
-                  null,
-                  null,
-                  1000,
-                  -1,
-                  QualityOfServiceMode.DUPLICATES_OK,
-                  10,
-                  -1,
-                  null,
-                  null,
-                  false);
-
-
-      addActiveMQComponent(bridge);
-      bridge.setTransactionManager(newTransactionManager());
-      bridge.start();
-
-      Thread clientThread = new Thread(new Runnable()
-      {
-         @Override
-         public void run()
-         {
-            while (bridge.isStarted())
-            {
-               try
-               {
-                  sendMessages(cf0, sourceQueue, 0, 1, false, false);
-               }
-               catch (Exception e)
-               {
-                  e.printStackTrace();
-               }
-            }
-         }
-      });
-
-      clientThread.start();
-
-      checkAllMessageReceivedInOrder(cf1, targetQueue, 0, 1, false);
-
-      JMSBridgeReconnectionTest.log.info("About to crash server");
-
-      jmsServer1.stop();
-
-      // Wait a while before starting up to simulate the dest being down for a while
-      JMSBridgeReconnectionTest.log.info("Waiting 5 secs before bringing server back up");
-      Thread.sleep(TIME_WAIT);
-      JMSBridgeReconnectionTest.log.info("Done wait");
-
-      bridge.stop();
-
-      clientThread.join(5000);
-
-      assertTrue(!clientThread.isAlive());
-   }
-
-   @Test
-   public void performCrashAndReconnect() throws Exception
-   {
-      performCrashAndReconnect(true);
-   }
-
-   @Test
-   public void performCrashAndNoReconnect() throws Exception
-   {
-      performCrashAndReconnect(false);
-   }
-
-
-   private void performCrashAndReconnect(boolean restart) throws Exception
-   {
-      cff1xa = new ConnectionFactoryFactory()
-      {
-         public Object createConnectionFactory() throws Exception
-         {
-            ActiveMQXAConnectionFactory cf = (ActiveMQXAConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.XA_CF,
-                                                                                                                         new TransportConfiguration(
-                                                                                                                            INVM_CONNECTOR_FACTORY,
-                                                                                                                            params1));
-
-            // Note! We disable automatic reconnection on the session factory. The bridge needs to do the reconnection
-            cf.setReconnectAttempts(-1);
-            cf.setBlockOnNonDurableSend(true);
-            cf.setBlockOnDurableSend(true);
-            cf.setCacheLargeMessagesClient(true);
-
-            return cf;
-         }
-
-      };
-
-      DummyTransactionManager tm = new DummyTransactionManager();
-      DummyTransaction tx = new DummyTransaction();
-      tm.tx = tx;
-
-      JMSBridgeImpl bridge =
-            new JMSBridgeImpl(cff0xa,
-                  cff1xa,
-                  sourceQueueFactory,
-                  targetQueueFactory,
-                  null,
-                  null,
-                  null,
-                  null,
-                  null,
-                  1000,
-                  -1,
-                  QualityOfServiceMode.ONCE_AND_ONLY_ONCE,
-                  10,
-                  5000,
-                  null,
-                  null,
-                  false);
-      addActiveMQComponent(bridge);
-      bridge.setTransactionManager(tm);
-
-      bridge.start();
-
-      // Now crash the dest server
-
-      JMSBridgeReconnectionTest.log.info("About to crash server");
-
-      jmsServer1.stop();
-
-      if (restart)
-      {
-         jmsServer1.start();
-      }
-      // Wait a while before starting up to simulate the dest being down for a while
-      JMSBridgeReconnectionTest.log.info("Waiting 5 secs before bringing server back up");
-      Thread.sleep(TIME_WAIT);
-      JMSBridgeReconnectionTest.log.info("Done wait");
-
-      bridge.stop();
-
-      if (restart)
-      {
-         assertTrue(tx.rolledback);
-         assertTrue(tx.targetConnected);
-      }
-      else
-      {
-         assertTrue(tx.rolledback);
-         assertFalse(tx.targetConnected);
-      }
-   }
-
-   private class DummyTransaction implements Transaction
-   {
-      boolean rolledback = false;
-      ClientSession targetSession;
-      boolean targetConnected = false;
-      @Override
-      public void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException, SecurityException, SystemException
-      {
-
-      }
-
-      @Override
-      public void rollback() throws IllegalStateException, SystemException
-      {
-         rolledback = true;
-         targetConnected = !targetSession.isClosed();
-      }
-
-      @Override
-      public void setRollbackOnly() throws IllegalStateException, SystemException
-      {
-
-      }
-
-      @Override
-      public int getStatus() throws SystemException
-      {
-         return 0;
-      }
-
-      @Override
-      public boolean enlistResource(XAResource xaResource) throws RollbackException, IllegalStateException, SystemException
-      {
-         targetSession = (ClientSession) xaResource;
-         return false;
-      }
-
-      @Override
-      public boolean delistResource(XAResource xaResource, int i) throws IllegalStateException, SystemException
-      {
-         return false;
-      }
-
-      @Override
-      public void registerSynchronization(Synchronization synchronization) throws RollbackException, IllegalStateException, SystemException
-      {
-
-      }
-   }
-   /*
-    * Send some messages
-    * Crash the destination server
-    * Set the max batch time such that it will attempt to send the batch while the dest server is down
-    * Bring up the destination server
-    * Send some more messages
-    * Verify all messages are received
-    */
-   private void performCrashAndReconnectDestCrashBeforePrepare(final boolean persistent) throws Exception
-   {
-      JMSBridgeImpl bridge =
-         new JMSBridgeImpl(cff0xa,
-                           cff1xa,
-                           sourceQueueFactory,
-                           targetQueueFactory,
-                           null,
-                           null,
-                           null,
-                           null,
-                           null,
-                           1000,
-                           -1,
-                           QualityOfServiceMode.ONCE_AND_ONLY_ONCE,
-                           10,
-                           5000,
-                           null,
-                           null,
-                           false);
-      addActiveMQComponent(bridge);
-      bridge.setTransactionManager(newTransactionManager());
-
-      bridge.start();
-
-      final int NUM_MESSAGES = 10;
-      // Send some messages
-
-      sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES / 2, persistent, false);
-
-      // verify none are received
-
-      checkEmpty(targetQueue, 1);
-
-      // Now crash the dest server
-
-      JMSBridgeReconnectionTest.log.info("About to crash server");
-
-      jmsServer1.stop();
-
-      // Wait a while before starting up to simulate the dest being down for a while
-      JMSBridgeReconnectionTest.log.info("Waiting 5 secs before bringing server back up");
-      Thread.sleep(TIME_WAIT);
-      JMSBridgeReconnectionTest.log.info("Done wait");
-
-      // Restart the server
-      jmsServer1.start();
-
-      createQueue("targetQueue", 1);
-
-      setUpAdministeredObjects();
-
-      sendMessages(cf0, sourceQueue, NUM_MESSAGES / 2, NUM_MESSAGES / 2, persistent, false);
-
-      checkMessagesReceived(cf1, targetQueue, QualityOfServiceMode.ONCE_AND_ONLY_ONCE, NUM_MESSAGES, false, false);
-   }
-}