You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2015/03/06 23:30:40 UTC
[07/15] activemq-6 git commit: Refactored the testsuite a bit
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/bridge/ClusteredBridgeTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/bridge/ClusteredBridgeTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/bridge/ClusteredBridgeTestBase.java
deleted file mode 100644
index 31a094a..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/bridge/ClusteredBridgeTestBase.java
+++ /dev/null
@@ -1,312 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.tests.integration.jms.bridge;
-
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.naming.Context;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import com.arjuna.ats.arjuna.coordinator.TransactionReaper;
-import com.arjuna.ats.arjuna.coordinator.TxControl;
-import org.apache.activemq.api.core.ActiveMQException;
-import org.apache.activemq.api.core.TransportConfiguration;
-import org.apache.activemq.api.core.client.ClientConsumer;
-import org.apache.activemq.api.core.client.ClientMessage;
-import org.apache.activemq.api.core.client.ClientProducer;
-import org.apache.activemq.api.core.client.ClientSession;
-import org.apache.activemq.api.core.client.ClientSessionFactory;
-import org.apache.activemq.api.core.client.FailoverEventListener;
-import org.apache.activemq.api.core.client.FailoverEventType;
-import org.apache.activemq.api.core.client.ActiveMQClient;
-import org.apache.activemq.api.core.client.ServerLocator;
-import org.apache.activemq.api.jms.ActiveMQJMSClient;
-import org.apache.activemq.api.jms.JMSFactoryType;
-import org.apache.activemq.core.config.Configuration;
-import org.apache.activemq.core.config.ha.ReplicaPolicyConfiguration;
-import org.apache.activemq.core.config.ha.ReplicatedPolicyConfiguration;
-import org.apache.activemq.core.registry.JndiBindingRegistry;
-import org.apache.activemq.core.remoting.impl.invm.TransportConstants;
-import org.apache.activemq.core.server.ActiveMQServer;
-import org.apache.activemq.core.server.ActiveMQServers;
-import org.apache.activemq.jms.bridge.ConnectionFactoryFactory;
-import org.apache.activemq.jms.bridge.DestinationFactory;
-import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
-import org.apache.activemq.jms.server.JMSServerManager;
-import org.apache.activemq.jms.server.impl.JMSServerManagerImpl;
-import org.apache.activemq.tests.unit.util.InVMContext;
-import org.apache.activemq.tests.util.ServiceTestBase;
-import org.junit.After;
-import org.junit.Before;
-
-/**
- * A ClusteredBridgeTestBase
- * This class serves as a base class for jms bridge tests in
- * clustered scenarios.
- *
- * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
- */
-public abstract class ClusteredBridgeTestBase extends ServiceTestBase
-{
- private static int index = 0;
-
- protected Map<String, ServerGroup> groups = new HashMap<String, ServerGroup>();
-
- @Override
- @Before
- public void setUp() throws Exception
- {
- super.setUp();
- Iterator<ServerGroup> iter = groups.values().iterator();
- while (iter.hasNext())
- {
- iter.next().start();
- }
- TxControl.enable();
- }
-
- @Override
- @After
- public void tearDown() throws Exception
- {
- Iterator<ServerGroup> iter = groups.values().iterator();
- while (iter.hasNext())
- {
- iter.next().stop();
- }
-
- TxControl.disable(true);
-
- TransactionReaper.terminate(false);
-
- super.tearDown();
- }
-
- //create a live/backup pair.
- protected ServerGroup createServerGroup(String name) throws Exception
- {
- ServerGroup server = groups.get(name);
- if (server == null)
- {
- server = new ServerGroup(name, groups.size());
- server.create();
- groups.put(name, server);
- }
- return server;
- }
-
- //each ServerGroup represents a live/backup pair
- protected class ServerGroup
- {
- private static final int ID_OFFSET = 100;
- private String name;
- private int id;
-
- private JMSServerManager liveNode;
- private JMSServerManager backupNode;
-
- private TransportConfiguration liveConnector;
- private TransportConfiguration backupConnector;
-
- private Context liveContext;
-
- private ServerLocator locator;
- private ClientSessionFactory sessionFactory;
-
- /**
- * @param name - name of the group
- * @param id - id of the live (should be < 100)
- */
- public ServerGroup(String name, int id)
- {
- this.name = name;
- this.id = id;
- }
-
- public String getName()
- {
- return name;
- }
-
- public void create() throws Exception
- {
- Map<String, Object> params0 = new HashMap<String, Object>();
- params0.put(TransportConstants.SERVER_ID_PROP_NAME, id);
- liveConnector = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params0, "in-vm-live");
-
- Map<String, Object> params = new HashMap<String, Object>();
- params.put(TransportConstants.SERVER_ID_PROP_NAME, id + ID_OFFSET);
- backupConnector = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params, "in-vm-backup");
-
- //live
- Configuration conf0 = createBasicConfig()
- .setJournalDirectory(getJournalDir(id, false))
- .setBindingsDirectory(getBindingsDir(id, false))
- .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params0))
- .addConnectorConfiguration(liveConnector.getName(), liveConnector)
- .setHAPolicyConfiguration(new ReplicatedPolicyConfiguration())
- .addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName()));
-
- ActiveMQServer server0 = addServer(ActiveMQServers.newActiveMQServer(conf0, true));
-
- liveContext = new InVMContext();
- liveNode = new JMSServerManagerImpl(server0);
- liveNode.setRegistry(new JndiBindingRegistry(liveContext));
-
- //backup
- Configuration conf = createBasicConfig()
- .setJournalDirectory(getJournalDir(id, true))
- .setBindingsDirectory(getBindingsDir(id, true))
- .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params))
- .addConnectorConfiguration(backupConnector.getName(), backupConnector)
- .addConnectorConfiguration(liveConnector.getName(), liveConnector)
- .setHAPolicyConfiguration(new ReplicaPolicyConfiguration())
- .addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName()));
-
- ActiveMQServer backup = addServer(ActiveMQServers.newActiveMQServer(conf, true));
-
- Context context = new InVMContext();
-
- backupNode = new JMSServerManagerImpl(backup);
- backupNode.setRegistry(new JndiBindingRegistry(context));
- }
-
- public void start() throws Exception
- {
- liveNode.start();
- waitForServer(liveNode.getActiveMQServer());
- backupNode.start();
- waitForRemoteBackupSynchronization(backupNode.getActiveMQServer());
-
- locator = ActiveMQClient.createServerLocatorWithHA(liveConnector);
- locator.setReconnectAttempts(-1);
- sessionFactory = locator.createSessionFactory();
- }
-
- public void stop() throws Exception
- {
- sessionFactory.close();
- locator.close();
- liveNode.stop();
- backupNode.stop();
- }
-
- public void createQueue(String queueName) throws Exception
- {
- liveNode.createQueue(true, queueName, null, true, "/queue/" + queueName);
- }
-
- public ConnectionFactoryFactory getConnectionFactoryFactory()
- {
- ConnectionFactoryFactory cff = new ConnectionFactoryFactory()
- {
- public ConnectionFactory createConnectionFactory() throws Exception
- {
- ActiveMQConnectionFactory cf = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.XA_CF,
- liveConnector);
- cf.getServerLocator().setReconnectAttempts(-1);
- return cf;
- }
- };
-
- return cff;
- }
-
- public DestinationFactory getDestinationFactory(final String queueName)
- {
-
- DestinationFactory destFactory = new DestinationFactory()
- {
- public Destination createDestination() throws Exception
- {
- return (Destination) liveContext.lookup("/queue/" + queueName);
- }
- };
- return destFactory;
- }
-
- public void sendMessages(String queueName, int num) throws ActiveMQException
- {
- ClientSession session = sessionFactory.createSession();
- ClientProducer producer = session.createProducer("jms.queue." + queueName);
- for (int i = 0; i < num; i++)
- {
- ClientMessage m = session.createMessage(true);
- m.putStringProperty("bridge-message", "hello " + index);
- index++;
- producer.send(m);
- }
- session.close();
- }
-
- public void receiveMessages(String queueName, int num, boolean checkDup) throws ActiveMQException
- {
- ClientSession session = sessionFactory.createSession();
- session.start();
- ClientConsumer consumer = session.createConsumer("jms.queue." + queueName);
- for (int i = 0; i < num; i++)
- {
- ClientMessage m = consumer.receive(30000);
- assertNotNull("i=" + i, m);
- assertNotNull(m.getStringProperty("bridge-message"));
- m.acknowledge();
- }
-
- ClientMessage m = consumer.receive(500);
- if (checkDup)
- {
- assertNull(m);
- }
- else
- {
- //drain messages
- while (m != null)
- {
- m = consumer.receive(200);
- }
- }
-
- session.close();
- }
-
- public void crashLive() throws Exception
- {
- final CountDownLatch latch = new CountDownLatch(1);
- sessionFactory.addFailoverListener(new FailoverEventListener()
- {
-
- @Override
- public void failoverEvent(FailoverEventType eventType)
- {
- if (eventType == FailoverEventType.FAILOVER_COMPLETED)
- {
- latch.countDown();
- }
- }
- });
-
- liveNode.getActiveMQServer().stop();
-
- boolean ok = latch.await(10000, TimeUnit.MILLISECONDS);
- assertTrue(ok);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/bridge/JMSBridgeClusteredTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/bridge/JMSBridgeClusteredTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/bridge/JMSBridgeClusteredTest.java
deleted file mode 100644
index d3baf43..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/bridge/JMSBridgeClusteredTest.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.tests.integration.jms.bridge;
-
-import javax.transaction.TransactionManager;
-
-import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple;
-import org.apache.activemq.api.core.ActiveMQException;
-import org.apache.activemq.jms.bridge.ConnectionFactoryFactory;
-import org.apache.activemq.jms.bridge.DestinationFactory;
-import org.apache.activemq.jms.bridge.QualityOfServiceMode;
-import org.apache.activemq.jms.bridge.impl.JMSBridgeImpl;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * A JMSBridgeClusteredTest
- * <p/>
- * Tests of jms bridge using HA connection factories.
- *
- * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
- */
-public class JMSBridgeClusteredTest extends ClusteredBridgeTestBase
-{
- private ServerGroup sourceServer;
- private ServerGroup targetServer;
-
- private String sourceQueueName = "SourceQueue";
- private String targetQueueName = "TargetQueue";
-
- @Override
- @Before
- public void setUp() throws Exception
- {
- super.setUp();
-
- sourceServer = createServerGroup("source-server");
- targetServer = createServerGroup("target-server");
-
- sourceServer.start();
- targetServer.start();
-
- sourceServer.createQueue(sourceQueueName);
- targetServer.createQueue(targetQueueName);
- }
-
- @Test
- public void testBridgeOnFailoverXA() throws Exception
- {
- performSourceAndTargetCrashAndFailover(QualityOfServiceMode.ONCE_AND_ONLY_ONCE);
- }
-
- @Test
- public void testBridgeOnFailoverDupsOk() throws Exception
- {
- performSourceAndTargetCrashAndFailover(QualityOfServiceMode.DUPLICATES_OK);
- }
-
- @Test
- public void testBridgeOnFailoverAtMostOnce() throws Exception
- {
- performSourceAndTargetCrashAndFailover(QualityOfServiceMode.AT_MOST_ONCE);
- }
-
- @Test
- public void testCrashAndFailoverWithMessagesXA() throws Exception
- {
- performSourceAndTargetCrashAndFailoverWithMessages(QualityOfServiceMode.ONCE_AND_ONLY_ONCE);
- }
-
- //test messages are correctly bridged when failover happens during a batch send.
- //first send some messages, make sure bridge doesn't send it (below batch size)
- //then crash the live
- //then send more messages
- //then receive those messages, no more, no less.
- //this test are valid for ONCE_AND_ONLY_ONCE and AT_MOST_ONCE.
- //with DUPS_OK the test failed because some messages are delivered again
- //after failover, which is fine as in this mode duplication is allowed.
- public void performSourceAndTargetCrashAndFailoverWithMessages(QualityOfServiceMode mode) throws Exception
- {
- JMSBridgeImpl bridge = null;
- TransactionManager txMgr = null;
-
- try
- {
- ConnectionFactoryFactory sourceCFF = sourceServer.getConnectionFactoryFactory();
- ConnectionFactoryFactory targetCFF = targetServer.getConnectionFactoryFactory();
- DestinationFactory sourceQueueFactory = sourceServer.getDestinationFactory(sourceQueueName);
- DestinationFactory targetQueueFactory = targetServer.getDestinationFactory(targetQueueName);
-
- //even number
- final int batchSize = 4;
- bridge = new JMSBridgeImpl(sourceCFF,
- targetCFF,
- sourceQueueFactory,
- targetQueueFactory,
- null,
- null,
- null,
- null,
- null,
- 1000,
- -1,
- mode,
- batchSize,
- -1,
- null,
- null,
- false);
-
- txMgr = newTransactionManager();
- bridge.setTransactionManager(txMgr);
-
- //start the bridge
- bridge.start();
-
- System.out.println("started bridge");
-
- final int NUM_MESSAGES = batchSize / 2;
-
- //send some messages to source
- sendMessages(sourceServer, sourceQueueName, NUM_MESSAGES);
- //receive from target, no message should be received.
- receiveMessages(targetServer, targetQueueName, 0);
-
- //now crash target server
- targetServer.crashLive();
-
- //send more
- sendMessages(sourceServer, sourceQueueName, NUM_MESSAGES);
-
- receiveMessages(targetServer, targetQueueName, batchSize);
-
- //send some again
- sendMessages(sourceServer, sourceQueueName, NUM_MESSAGES);
- //check no messages arrived.
- receiveMessages(targetServer, targetQueueName, 0);
- //now crash source server
- sourceServer.crashLive();
-
- //verify bridge still work
- sendMessages(sourceServer, sourceQueueName, NUM_MESSAGES);
- receiveMessages(targetServer, targetQueueName, batchSize);
- }
- finally
- {
- if (bridge != null)
- {
- bridge.stop();
- }
- }
- }
-
- /*
- * Deploy a bridge, source and target queues are in
- * separate live/backup pairs. Source and Target CF are ha.
- * Test the bridge work when the live servers crash.
- */
- private void performSourceAndTargetCrashAndFailover(QualityOfServiceMode mode) throws Exception
- {
-
- JMSBridgeImpl bridge = null;
- TransactionManager txMgr = null;
-
- try
- {
- ConnectionFactoryFactory sourceCFF = sourceServer.getConnectionFactoryFactory();
- ConnectionFactoryFactory targetCFF = targetServer.getConnectionFactoryFactory();
- DestinationFactory sourceQueueFactory = sourceServer.getDestinationFactory(sourceQueueName);
- DestinationFactory targetQueueFactory = targetServer.getDestinationFactory(targetQueueName);
-
- bridge = new JMSBridgeImpl(sourceCFF,
- targetCFF,
- sourceQueueFactory,
- targetQueueFactory,
- null,
- null,
- null,
- null,
- null,
- 1000,
- -1,
- mode,
- 10,
- 1000,
- null,
- null,
- false);
-
- txMgr = newTransactionManager();
- bridge.setTransactionManager(txMgr);
-
- //start the bridge
- bridge.start();
-
- final int NUM_MESSAGES = 10;
-
- //send some messages to source
- sendMessages(sourceServer, sourceQueueName, NUM_MESSAGES);
- //receive from target
- receiveMessages(targetServer, targetQueueName, NUM_MESSAGES);
-
- //now crash target server
- targetServer.crashLive();
-
- //verify bridge still works
- sendMessages(sourceServer, sourceQueueName, NUM_MESSAGES);
- receiveMessages(targetServer, targetQueueName, NUM_MESSAGES);
-
- //now crash source server
- sourceServer.crashLive();
-
- //verify bridge still work
- sendMessages(sourceServer, sourceQueueName, NUM_MESSAGES);
- receiveMessages(targetServer, targetQueueName, NUM_MESSAGES, mode == QualityOfServiceMode.ONCE_AND_ONLY_ONCE);
- }
- finally
- {
- if (bridge != null)
- {
- bridge.stop();
- }
- }
- }
-
- private void sendMessages(ServerGroup server, String queueName, int num) throws ActiveMQException
- {
- server.sendMessages(queueName, num);
- }
-
- private void receiveMessages(ServerGroup server, String queueName, int num, boolean checkDup) throws ActiveMQException
- {
- try
- {
- server.receiveMessages(queueName, num, checkDup);
- }
- catch (ActiveMQException e)
- {
- e.printStackTrace();
- throw e;
- }
- }
-
- private void receiveMessages(ServerGroup server, String queueName, int num) throws ActiveMQException
- {
- try
- {
- server.receiveMessages(queueName, num, false);
- }
- catch (ActiveMQException e)
- {
- e.printStackTrace();
- throw e;
- }
- }
-
- protected TransactionManager newTransactionManager()
- {
- return new TransactionManagerImple();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
deleted file mode 100644
index 752c8c5..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
+++ /dev/null
@@ -1,559 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.tests.integration.jms.bridge;
-
-import org.apache.activemq.api.core.TransportConfiguration;
-import org.apache.activemq.api.core.client.ClientSession;
-import org.apache.activemq.api.jms.ActiveMQJMSClient;
-import org.apache.activemq.api.jms.JMSFactoryType;
-import org.apache.activemq.jms.bridge.ConnectionFactoryFactory;
-import org.apache.activemq.jms.bridge.QualityOfServiceMode;
-import org.apache.activemq.jms.bridge.impl.JMSBridgeImpl;
-import org.apache.activemq.jms.client.ActiveMQXAConnectionFactory;
-import org.apache.activemq.tests.integration.IntegrationTestLogger;
-import org.apache.activemq.tests.integration.ra.DummyTransactionManager;
-import org.junit.Assert;
-import org.junit.Test;
-
-import javax.transaction.HeuristicMixedException;
-import javax.transaction.HeuristicRollbackException;
-import javax.transaction.RollbackException;
-import javax.transaction.Synchronization;
-import javax.transaction.SystemException;
-import javax.transaction.Transaction;
-import javax.transaction.xa.XAResource;
-
-/**
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- */
-public class JMSBridgeReconnectionTest extends BridgeTestBase
-{
- /**
- *
- */
- private static final int TIME_WAIT = 5000;
-
- private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
-
- // Crash and reconnect
-
- // Once and only once
-
- @Test
- public void testCrashAndReconnectDestBasic_OnceAndOnlyOnce_P() throws Exception
- {
- performCrashAndReconnectDestBasic(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true, false);
- }
-
- @Test
- public void testCrashAndReconnectDestBasic_OnceAndOnlyOnce_P_LargeMessage() throws Exception
- {
- performCrashAndReconnectDestBasic(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true, true);
- }
-
- @Test
- public void testCrashAndReconnectDestBasic_OnceAndOnlyOnce_NP() throws Exception
- {
- performCrashAndReconnectDestBasic(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, false, false);
- }
-
- // dups ok
-
- @Test
- public void testCrashAndReconnectDestBasic_DuplicatesOk_P() throws Exception
- {
- performCrashAndReconnectDestBasic(QualityOfServiceMode.DUPLICATES_OK, true, false);
- }
-
- @Test
- public void testCrashAndReconnectDestBasic_DuplicatesOk_NP() throws Exception
- {
- performCrashAndReconnectDestBasic(QualityOfServiceMode.DUPLICATES_OK, false, false);
- }
-
- // At most once
-
- @Test
- public void testCrashAndReconnectDestBasic_AtMostOnce_P() throws Exception
- {
- performCrashAndReconnectDestBasic(QualityOfServiceMode.AT_MOST_ONCE, true, false);
- }
-
- @Test
- public void testCrashAndReconnectDestBasic_AtMostOnce_NP() throws Exception
- {
- performCrashAndReconnectDestBasic(QualityOfServiceMode.AT_MOST_ONCE, false, false);
- }
-
- // Crash tests specific to XA transactions
-
- @Test
- public void testCrashAndReconnectDestCrashBeforePrepare_P() throws Exception
- {
- performCrashAndReconnectDestCrashBeforePrepare(true);
- }
-
- @Test
- public void testCrashAndReconnectDestCrashBeforePrepare_NP() throws Exception
- {
- performCrashAndReconnectDestCrashBeforePrepare(false);
- }
-
- // Crash before bridge is started
-
- @Test
- public void testRetryConnectionOnStartup() throws Exception
- {
- jmsServer1.stop();
-
- JMSBridgeImpl bridge = new JMSBridgeImpl(cff0,
- cff1,
- sourceQueueFactory,
- targetQueueFactory,
- null,
- null,
- null,
- null,
- null,
- 1000,
- -1,
- QualityOfServiceMode.DUPLICATES_OK,
- 10,
- -1,
- null,
- null,
- false);
- bridge.setTransactionManager(newTransactionManager());
- addActiveMQComponent(bridge);
- bridge.start();
- Assert.assertFalse(bridge.isStarted());
- Assert.assertTrue(bridge.isFailed());
-
- // Restart the server
- jmsServer1.start();
-
- createQueue("targetQueue", 1);
- setUpAdministeredObjects();
-
- Thread.sleep(3000);
-
- Assert.assertTrue(bridge.isStarted());
- Assert.assertFalse(bridge.isFailed());
- }
-
- /**
- * https://jira.jboss.org/jira/browse/HORNETQ-287
- */
- @Test
- public void testStopBridgeWithFailureWhenStarted() throws Exception
- {
- jmsServer1.stop();
-
- JMSBridgeImpl bridge = new JMSBridgeImpl(cff0,
- cff1,
- sourceQueueFactory,
- targetQueueFactory,
- null,
- null,
- null,
- null,
- null,
- 500,
- -1,
- QualityOfServiceMode.DUPLICATES_OK,
- 10,
- -1,
- null,
- null,
- false);
- bridge.setTransactionManager(newTransactionManager());
-
- bridge.start();
- Assert.assertFalse(bridge.isStarted());
- Assert.assertTrue(bridge.isFailed());
-
- bridge.stop();
- Assert.assertFalse(bridge.isStarted());
-
- // we restart and setup the server for the test's tearDown checks
- jmsServer1.start();
- createQueue("targetQueue", 1);
- setUpAdministeredObjects();
- }
-
- /*
- * Send some messages
- * Crash the destination server
- * Bring the destination server back up
- * Send some more messages
- * Verify all messages are received
- */
- private void performCrashAndReconnectDestBasic(final QualityOfServiceMode qosMode,
- final boolean persistent,
- final boolean largeMessage) throws Exception
- {
- JMSBridgeImpl bridge = null;
-
- ConnectionFactoryFactory factInUse0 = cff0;
- ConnectionFactoryFactory factInUse1 = cff1;
- if (qosMode.equals(QualityOfServiceMode.ONCE_AND_ONLY_ONCE))
- {
- factInUse0 = cff0xa;
- factInUse1 = cff1xa;
- }
-
- bridge =
- new JMSBridgeImpl(factInUse0,
- factInUse1,
- sourceQueueFactory,
- targetQueueFactory,
- null,
- null,
- null,
- null,
- null,
- 1000,
- -1,
- qosMode,
- 10,
- -1,
- null,
- null,
- false);
- addActiveMQComponent(bridge);
- bridge.setTransactionManager(newTransactionManager());
- bridge.start();
-
- final int NUM_MESSAGES = 10;
-
- // Send some messages
-
- sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES / 2, persistent, largeMessage);
-
- // Verify none are received
-
- checkEmpty(targetQueue, 1);
-
- // Now crash the dest server
-
- JMSBridgeReconnectionTest.log.info("About to crash server");
-
- jmsServer1.stop();
-
- // Wait a while before starting up to simulate the dest being down for a while
- JMSBridgeReconnectionTest.log.info("Waiting 5 secs before bringing server back up");
- Thread.sleep(TIME_WAIT);
- JMSBridgeReconnectionTest.log.info("Done wait");
-
- // Restart the server
- JMSBridgeReconnectionTest.log.info("Restarting server");
- jmsServer1.start();
-
- // jmsServer1.createQueue(false, "targetQueue", null, true, "queue/targetQueue");
-
- createQueue("targetQueue", 1);
-
- setUpAdministeredObjects();
-
- // Send some more messages
-
- JMSBridgeReconnectionTest.log.info("Sending more messages");
-
- sendMessages(cf0, sourceQueue, NUM_MESSAGES / 2, NUM_MESSAGES / 2, persistent, largeMessage);
-
- JMSBridgeReconnectionTest.log.info("Sent messages");
-
- jmsServer1.stop();
-
- bridge.stop();
-
- System.out.println("JMSBridgeReconnectionTest.performCrashAndReconnectDestBasic");
- }
-
- @Test
- public void performCrashDestinationStopBridge() throws Exception
- {
- ConnectionFactoryFactory factInUse0 = cff0;
- ConnectionFactoryFactory factInUse1 = cff1;
- final JMSBridgeImpl bridge =
- new JMSBridgeImpl(factInUse0,
- factInUse1,
- sourceQueueFactory,
- targetQueueFactory,
- null,
- null,
- null,
- null,
- null,
- 1000,
- -1,
- QualityOfServiceMode.DUPLICATES_OK,
- 10,
- -1,
- null,
- null,
- false);
-
-
- addActiveMQComponent(bridge);
- bridge.setTransactionManager(newTransactionManager());
- bridge.start();
-
- Thread clientThread = new Thread(new Runnable()
- {
- @Override
- public void run()
- {
- while (bridge.isStarted())
- {
- try
- {
- sendMessages(cf0, sourceQueue, 0, 1, false, false);
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- }
- });
-
- clientThread.start();
-
- checkAllMessageReceivedInOrder(cf1, targetQueue, 0, 1, false);
-
- JMSBridgeReconnectionTest.log.info("About to crash server");
-
- jmsServer1.stop();
-
- // Wait a while before starting up to simulate the dest being down for a while
- JMSBridgeReconnectionTest.log.info("Waiting 5 secs before bringing server back up");
- Thread.sleep(TIME_WAIT);
- JMSBridgeReconnectionTest.log.info("Done wait");
-
- bridge.stop();
-
- clientThread.join(5000);
-
- assertTrue(!clientThread.isAlive());
- }
-
- @Test
- public void performCrashAndReconnect() throws Exception
- {
- performCrashAndReconnect(true);
- }
-
- @Test
- public void performCrashAndNoReconnect() throws Exception
- {
- performCrashAndReconnect(false);
- }
-
-
- private void performCrashAndReconnect(boolean restart) throws Exception
- {
- cff1xa = new ConnectionFactoryFactory()
- {
- public Object createConnectionFactory() throws Exception
- {
- ActiveMQXAConnectionFactory cf = (ActiveMQXAConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.XA_CF,
- new TransportConfiguration(
- INVM_CONNECTOR_FACTORY,
- params1));
-
- // Note! We disable automatic reconnection on the session factory. The bridge needs to do the reconnection
- cf.setReconnectAttempts(-1);
- cf.setBlockOnNonDurableSend(true);
- cf.setBlockOnDurableSend(true);
- cf.setCacheLargeMessagesClient(true);
-
- return cf;
- }
-
- };
-
- DummyTransactionManager tm = new DummyTransactionManager();
- DummyTransaction tx = new DummyTransaction();
- tm.tx = tx;
-
- JMSBridgeImpl bridge =
- new JMSBridgeImpl(cff0xa,
- cff1xa,
- sourceQueueFactory,
- targetQueueFactory,
- null,
- null,
- null,
- null,
- null,
- 1000,
- -1,
- QualityOfServiceMode.ONCE_AND_ONLY_ONCE,
- 10,
- 5000,
- null,
- null,
- false);
- addActiveMQComponent(bridge);
- bridge.setTransactionManager(tm);
-
- bridge.start();
-
- // Now crash the dest server
-
- JMSBridgeReconnectionTest.log.info("About to crash server");
-
- jmsServer1.stop();
-
- if (restart)
- {
- jmsServer1.start();
- }
- // Wait a while before starting up to simulate the dest being down for a while
- JMSBridgeReconnectionTest.log.info("Waiting 5 secs before bringing server back up");
- Thread.sleep(TIME_WAIT);
- JMSBridgeReconnectionTest.log.info("Done wait");
-
- bridge.stop();
-
- if (restart)
- {
- assertTrue(tx.rolledback);
- assertTrue(tx.targetConnected);
- }
- else
- {
- assertTrue(tx.rolledback);
- assertFalse(tx.targetConnected);
- }
- }
-
- private class DummyTransaction implements Transaction
- {
- boolean rolledback = false;
- ClientSession targetSession;
- boolean targetConnected = false;
- @Override
- public void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException, SecurityException, SystemException
- {
-
- }
-
- @Override
- public void rollback() throws IllegalStateException, SystemException
- {
- rolledback = true;
- targetConnected = !targetSession.isClosed();
- }
-
- @Override
- public void setRollbackOnly() throws IllegalStateException, SystemException
- {
-
- }
-
- @Override
- public int getStatus() throws SystemException
- {
- return 0;
- }
-
- @Override
- public boolean enlistResource(XAResource xaResource) throws RollbackException, IllegalStateException, SystemException
- {
- targetSession = (ClientSession) xaResource;
- return false;
- }
-
- @Override
- public boolean delistResource(XAResource xaResource, int i) throws IllegalStateException, SystemException
- {
- return false;
- }
-
- @Override
- public void registerSynchronization(Synchronization synchronization) throws RollbackException, IllegalStateException, SystemException
- {
-
- }
- }
- /*
- * Send some messages
- * Crash the destination server
- * Set the max batch time such that it will attempt to send the batch while the dest server is down
- * Bring up the destination server
- * Send some more messages
- * Verify all messages are received
- */
- private void performCrashAndReconnectDestCrashBeforePrepare(final boolean persistent) throws Exception
- {
- JMSBridgeImpl bridge =
- new JMSBridgeImpl(cff0xa,
- cff1xa,
- sourceQueueFactory,
- targetQueueFactory,
- null,
- null,
- null,
- null,
- null,
- 1000,
- -1,
- QualityOfServiceMode.ONCE_AND_ONLY_ONCE,
- 10,
- 5000,
- null,
- null,
- false);
- addActiveMQComponent(bridge);
- bridge.setTransactionManager(newTransactionManager());
-
- bridge.start();
-
- final int NUM_MESSAGES = 10;
- // Send some messages
-
- sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES / 2, persistent, false);
-
- // verify none are received
-
- checkEmpty(targetQueue, 1);
-
- // Now crash the dest server
-
- JMSBridgeReconnectionTest.log.info("About to crash server");
-
- jmsServer1.stop();
-
- // Wait a while before starting up to simulate the dest being down for a while
- JMSBridgeReconnectionTest.log.info("Waiting 5 secs before bringing server back up");
- Thread.sleep(TIME_WAIT);
- JMSBridgeReconnectionTest.log.info("Done wait");
-
- // Restart the server
- jmsServer1.start();
-
- createQueue("targetQueue", 1);
-
- setUpAdministeredObjects();
-
- sendMessages(cf0, sourceQueue, NUM_MESSAGES / 2, NUM_MESSAGES / 2, persistent, false);
-
- checkMessagesReceived(cf1, targetQueue, QualityOfServiceMode.ONCE_AND_ONLY_ONCE, NUM_MESSAGES, false, false);
- }
-}