You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2015/03/06 23:30:41 UTC
[08/15] activemq-6 git commit: Refactored the testsuite a bit
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/jms/xa/XATest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/jms/xa/XATest.java b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/jms/xa/XATest.java
new file mode 100644
index 0000000..4847e34
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/jms/xa/XATest.java
@@ -0,0 +1,2142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.extras.jms.xa;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import com.arjuna.ats.arjuna.coordinator.TransactionReaper;
+import com.arjuna.ats.arjuna.coordinator.TxControl;
+import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple;
+
+import org.apache.activemq.api.jms.ActiveMQJMSClient;
+import org.apache.activemq.core.client.impl.ClientSessionInternal;
+import org.apache.activemq.tests.extras.ExtrasTestLogger;
+import org.apache.activemq.tests.util.JMSTestBase;
+import org.jboss.tm.TxUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ *
+ * A XATestBase
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ *
+ */
+public class XATest extends JMSTestBase
+{
+ // Constants -----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ protected TransactionManager tm;
+
+ protected Transaction suspendedTx;
+
+ protected XAConnectionFactory xacf;
+
+ protected Queue queue1;
+
+ // Constructors --------------------------------------------------
+
+ // TestCase overrides -------------------------------------------
+
+ @Override
+ @Before
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ xacf = ActiveMQJMSClient.createConnectionFactory("tcp://localhost:61616", "test");
+
+ queue1 = createQueue("queue1");
+ TxControl.enable();
+
+ tm = new TransactionManagerImple();
+
+ Assert.assertTrue(tm instanceof TransactionManagerImple);
+
+ suspendedTx = tm.suspend();
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception
+ {
+ if (TxUtils.isUncommitted(tm))
+ {
+ // roll it back
+ try
+ {
+ tm.rollback();
+ }
+ catch (Throwable ignore)
+ {
+ // The connection will probably be closed so this may well throw an exception
+ }
+ }
+ if (tm.getTransaction() != null)
+ {
+ Transaction tx = tm.suspend();
+ if (tx != null)
+ {
+ ExtrasTestLogger.LOGGER.warn("Transaction still associated with thread " + tx +
+ " at status " +
+ TxUtils.getStatusAsString(tx.getStatus()));
+ }
+ }
+
+ if (suspendedTx != null)
+ {
+ tm.resume(suspendedTx);
+ }
+
+ TxControl.disable(true);
+
+ TransactionReaper.terminate(false);
+
+ super.tearDown();
+
+ }
+
+ // Public --------------------------------------------------------
+
+ @Test
+ public void test2PCSendCommit1PCOptimization() throws Exception
+ {
+ // Since both resources have same RM, TM will probably use 1PC optimization
+
+ XAConnection conn = null;
+ Connection conn2 = null;
+
+ try
+ {
+ conn = xacf.createXAConnection();
+
+ tm.begin();
+
+ XASession sess = conn.createXASession();
+ XAResource res = sess.getXAResource();
+
+ XAResource res2 = new DummyXAResource();
+
+ Transaction tx = tm.getTransaction();
+ tx.enlistResource(res);
+ tx.enlistResource(res2);
+
+ MessageProducer prod = sess.createProducer(queue1);
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ Message m = sess.createTextMessage("XATest1");
+ prod.send(m);
+ m = sess.createTextMessage("XATest2");
+ prod.send(m);
+
+ tx.delistResource(res, XAResource.TMSUCCESS);
+ tx.delistResource(res2, XAResource.TMSUCCESS);
+
+ tm.commit();
+
+ conn2 = cf.createConnection();
+ conn2.start();
+ Session sessReceiver = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer cons = sessReceiver.createConsumer(queue1);
+ TextMessage m2 = (TextMessage)cons.receive(1000);
+ Assert.assertNotNull(m2);
+ Assert.assertEquals("XATest1", m2.getText());
+ m2 = (TextMessage)cons.receive(1000);
+ Assert.assertNotNull(m2);
+ Assert.assertEquals("XATest2", m2.getText());
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+ }
+
+ @Test
+ public void test2PCSendCommit() throws Exception
+ {
+ XAConnection conn = null;
+ Connection conn2 = null;
+
+ try
+ {
+ conn = xacf.createXAConnection();
+
+ tm.begin();
+
+ XASession sess = conn.createXASession();
+
+ XAResource res = sess.getXAResource();
+ XAResource res2 = new DummyXAResource();
+
+ // To prevent 1PC optimization being used
+ // res.setForceNotSameRM(true);
+
+ Transaction tx = tm.getTransaction();
+
+ tx.enlistResource(res);
+
+ tx.enlistResource(res2);
+
+ MessageProducer prod = sess.createProducer(queue1);
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ Message m = sess.createTextMessage("XATest1");
+ prod.send(m);
+ m = sess.createTextMessage("XATest2");
+ prod.send(m);
+
+ tx.delistResource(res, XAResource.TMSUCCESS);
+ tx.delistResource(res2, XAResource.TMSUCCESS);
+
+ tm.commit();
+
+ conn2 = cf.createConnection();
+ conn2.start();
+ Session sessReceiver = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer cons = sessReceiver.createConsumer(queue1);
+ TextMessage m2 = (TextMessage)cons.receive(5000);
+ Assert.assertNotNull(m2);
+ Assert.assertEquals("XATest1", m2.getText());
+ m2 = (TextMessage)cons.receive(5000);
+ Assert.assertNotNull(m2);
+ Assert.assertEquals("XATest2", m2.getText());
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+ }
+
+ @Test
+ public void test2PCSendFailOnPrepare() throws Exception
+ {
+ XAConnection conn = null;
+ Connection conn2 = null;
+ try
+ {
+ conn = xacf.createXAConnection();
+
+ tm.begin();
+
+ XASession sess = conn.createXASession();
+ XAResource res = sess.getXAResource();
+
+ // prevent 1Pc optimisation
+ // res.setForceNotSameRM(true);
+
+ XAResource res2 = new DummyXAResource(true);
+ XAResource res3 = new DummyXAResource();
+ XAResource res4 = new DummyXAResource();
+
+ Transaction tx = tm.getTransaction();
+ tx.enlistResource(res);
+ tx.enlistResource(res2);
+ tx.enlistResource(res3);
+ tx.enlistResource(res4);
+
+ MessageProducer prod = sess.createProducer(null);
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ Message m = sess.createTextMessage("XATest1");
+ prod.send(queue1, m);
+ m = sess.createTextMessage("XATest2");
+ prod.send(queue1, m);
+
+ tx.delistResource(res, XAResource.TMSUCCESS);
+ tx.delistResource(res2, XAResource.TMSUCCESS);
+ tx.delistResource(res3, XAResource.TMSUCCESS);
+ tx.delistResource(res4, XAResource.TMSUCCESS);
+
+ try
+ {
+ tm.commit();
+
+ Assert.fail("should not get here");
+ }
+ catch (Exception e)
+ {
+ // We should expect this
+ }
+
+ conn2 = cf.createConnection();
+ conn2.start();
+ Session sessReceiver = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer cons = sessReceiver.createConsumer(queue1);
+ Message m2 = cons.receive(100);
+ Assert.assertNull(m2);
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+ }
+
+ @Test
+ public void test2PCSendRollback() throws Exception
+ {
+ XAConnection conn = null;
+ Connection conn2 = null;
+ try
+ {
+ conn = xacf.createXAConnection();
+
+ tm.begin();
+
+ XASession sess = conn.createXASession();
+ XAResource res = sess.getXAResource();
+
+ // prevent 1Pc optimisation
+ // res.setForceNotSameRM(true);
+
+ XAResource res2 = new DummyXAResource();
+
+ Transaction tx = tm.getTransaction();
+ tx.enlistResource(res);
+ tx.enlistResource(res2);
+
+ MessageProducer prod = sess.createProducer(null);
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ Message m = sess.createTextMessage("XATest1");
+ prod.send(queue1, m);
+ m = sess.createTextMessage("XATest2");
+ prod.send(queue1, m);
+
+ tx.delistResource(res, XAResource.TMSUCCESS);
+ tx.delistResource(res2, XAResource.TMSUCCESS);
+
+ tm.rollback();
+
+ conn2 = cf.createConnection();
+ conn2.start();
+ Session sessReceiver = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer cons = sessReceiver.createConsumer(queue1);
+ Message m2 = cons.receive(100);
+ Assert.assertNull(m2);
+
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+ }
+
+ @Test
+ public void test2PCReceiveCommit1PCOptimization() throws Exception
+ {
+ // Since both resources have some RM, TM will probably use 1PC optimization
+
+ XAConnection conn = null;
+ Connection conn2 = null;
+
+ try
+ {
+ conn2 = cf.createConnection();
+ conn2.start();
+ Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = sessProducer.createProducer(queue1);
+ Message m = sessProducer.createTextMessage("XATest1");
+ prod.send(m);
+ m = sessProducer.createTextMessage("XATest2");
+ prod.send(m);
+
+ conn = xacf.createXAConnection();
+ conn.start();
+
+ tm.begin();
+
+ XASession sess = conn.createXASession();
+ XAResource res = sess.getXAResource();
+
+ XAResource res2 = new DummyXAResource();
+
+ Transaction tx = tm.getTransaction();
+ tx.enlistResource(res);
+ tx.enlistResource(res2);
+
+ MessageConsumer cons = sess.createConsumer(queue1);
+
+ TextMessage m2 = (TextMessage)cons.receive(5000);
+
+ Assert.assertNotNull(m2);
+ Assert.assertEquals("XATest1", m2.getText());
+
+ m2 = (TextMessage)cons.receive(5000);
+
+ Assert.assertNotNull(m2);
+ Assert.assertEquals("XATest2", m2.getText());
+
+ tx.delistResource(res, XAResource.TMSUCCESS);
+ tx.delistResource(res2, XAResource.TMSUCCESS);
+
+ tm.commit();
+
+ // New tx
+ tm.begin();
+ tx = tm.getTransaction();
+ tx.enlistResource(res);
+ tx.enlistResource(res2);
+
+ Message m3 = cons.receive(100);
+
+ Assert.assertNull(m3);
+
+ tx.delistResource(res, XAResource.TMSUCCESS);
+ tx.delistResource(res2, XAResource.TMSUCCESS);
+
+ tm.commit();
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+
+ }
+
+ @Test
+ public void test2PCReceiveCommit() throws Exception
+ {
+ XAConnection conn = null;
+ Connection conn2 = null;
+
+ try
+ {
+ conn2 = cf.createConnection();
+ conn2.start();
+ Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = sessProducer.createProducer(queue1);
+ Message m = sessProducer.createTextMessage("XATest1");
+ prod.send(m);
+ m = sessProducer.createTextMessage("XATest2");
+ prod.send(m);
+
+ conn = xacf.createXAConnection();
+ conn.start();
+
+ tm.begin();
+
+ XASession sess = conn.createXASession();
+ XAResource res = sess.getXAResource();
+ // res.setForceNotSameRM(true);
+
+ XAResource res2 = new DummyXAResource();
+
+ Transaction tx = tm.getTransaction();
+ tx.enlistResource(res);
+ tx.enlistResource(res2);
+
+ MessageConsumer cons = sess.createConsumer(queue1);
+
+ TextMessage m2 = (TextMessage)cons.receive(5000);
+
+ Assert.assertNotNull(m2);
+ Assert.assertEquals("XATest1", m2.getText());
+
+ m2 = (TextMessage)cons.receive(5000);
+
+ Assert.assertNotNull(m2);
+ Assert.assertEquals("XATest2", m2.getText());
+
+ tx.delistResource(res, XAResource.TMSUCCESS);
+ tx.delistResource(res2, XAResource.TMSUCCESS);
+
+ tm.commit();
+
+ // New tx
+ tm.begin();
+ tx = tm.getTransaction();
+ tx.enlistResource(res);
+ tx.enlistResource(res2);
+
+ Message m3 = cons.receive(100);
+
+ Assert.assertNull(m3);
+
+ tx.delistResource(res, XAResource.TMSUCCESS);
+ tx.delistResource(res2, XAResource.TMSUCCESS);
+
+ tm.commit();
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+
+ }
+
+ @Test
+ public void test2PCReceiveRollback1PCOptimization() throws Exception
+ {
+ // Since both resources have some RM, TM will probably use 1PC optimization
+
+ XAConnection conn = null;
+ Connection conn2 = null;
+
+ try
+ {
+ conn2 = cf.createConnection();
+ Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = sessProducer.createProducer(queue1);
+ Message m = sessProducer.createTextMessage("XATest1");
+ prod.send(m);
+
+ m = sessProducer.createTextMessage("XATest2");
+ prod.send(m);
+
+ conn = xacf.createXAConnection();
+ conn.start();
+
+ tm.begin();
+
+ XASession sess = conn.createXASession();
+ XAResource res = sess.getXAResource();
+
+ XAResource res2 = new DummyXAResource();
+
+ Transaction tx = tm.getTransaction();
+ tx.enlistResource(res);
+ tx.enlistResource(res2);
+
+ MessageConsumer cons = sess.createConsumer(queue1);
+
+ TextMessage m2 = (TextMessage)cons.receive(5000);
+ Assert.assertNotNull(m2);
+ Assert.assertEquals("XATest1", m2.getText());
+ m2 = (TextMessage)cons.receive(5000);
+ Assert.assertNotNull(m2);
+ Assert.assertEquals("XATest2", m2.getText());
+
+ tx.delistResource(res, XAResource.TMSUCCESS);
+ tx.delistResource(res2, XAResource.TMSUCCESS);
+
+ tm.rollback();
+
+ // Message should be redelivered
+
+ // New tx
+ tm.begin();
+ tx = tm.getTransaction();
+ tx.enlistResource(res);
+ tx.enlistResource(res2);
+
+ TextMessage m3 = (TextMessage)cons.receive(5000);
+ Assert.assertNotNull(m3);
+ Assert.assertEquals("XATest1", m3.getText());
+ m3 = (TextMessage)cons.receive(5000);
+ Assert.assertNotNull(m3);
+ Assert.assertEquals("XATest2", m3.getText());
+
+ Assert.assertTrue(m3.getJMSRedelivered());
+
+ tx.delistResource(res, XAResource.TMSUCCESS);
+ tx.delistResource(res2, XAResource.TMSUCCESS);
+
+ tm.commit();
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+ }
+
+ @Test
+ public void test2PCReceiveRollback() throws Exception
+ {
+ XAConnection conn = null;
+ Connection conn2 = null;
+
+ try
+ {
+ conn2 = cf.createConnection();
+ Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = sessProducer.createProducer(queue1);
+ Message m = sessProducer.createTextMessage("XATest1");
+ prod.send(m);
+
+ m = sessProducer.createTextMessage("XATest2");
+ prod.send(m);
+
+ conn = xacf.createXAConnection();
+ conn.start();
+
+ tm.begin();
+
+ XASession sess = conn.createXASession();
+ XAResource res = sess.getXAResource();
+ // res.setForceNotSameRM(true);
+
+ XAResource res2 = new DummyXAResource();
+
+ Transaction tx = tm.getTransaction();
+ tx.enlistResource(res);
+ tx.enlistResource(res2);
+
+ MessageConsumer cons = sess.createConsumer(queue1);
+
+ TextMessage m2 = (TextMessage)cons.receive(5000);
+ Assert.assertNotNull(m2);
+ Assert.assertEquals("XATest1", m2.getText());
+ m2 = (TextMessage)cons.receive(5000);
+ Assert.assertNotNull(m2);
+ Assert.assertEquals("XATest2", m2.getText());
+
+ tx.delistResource(res, XAResource.TMSUCCESS);
+ tx.delistResource(res2, XAResource.TMSUCCESS);
+
+ tm.rollback();
+
+ // Message should be redelivered
+
+ // New tx
+ tm.begin();
+ tx = tm.getTransaction();
+ tx.enlistResource(res);
+ tx.enlistResource(res2);
+
+ TextMessage m3 = (TextMessage)cons.receive(5000);
+ Assert.assertNotNull(m3);
+ Assert.assertEquals("XATest1", m3.getText());
+ m3 = (TextMessage)cons.receive(5000);
+ Assert.assertNotNull(m3);
+ Assert.assertEquals("XATest2", m3.getText());
+
+ Assert.assertTrue(m3.getJMSRedelivered());
+
+ tx.delistResource(res, XAResource.TMSUCCESS);
+ tx.delistResource(res2, XAResource.TMSUCCESS);
+
+ tm.commit();
+
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+
+ }
+
+ @Test
+ public void test1PCSendCommit() throws Exception
+ {
+ XAConnection conn = null;
+ Connection conn2 = null;
+
+ try
+ {
+ conn = xacf.createXAConnection();
+
+ tm.begin();
+
+ XASession sess = conn.createXASession();
+ XAResource res = sess.getXAResource();
+
+ Transaction tx = tm.getTransaction();
+ tx.enlistResource(res);
+
+ MessageProducer prod = sess.createProducer(null);
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ Message m = sess.createTextMessage("XATest1");
+ prod.send(queue1, m);
+ m = sess.createTextMessage("XATest2");
+ prod.send(queue1, m);
+
+ tx.delistResource(res, XAResource.TMSUCCESS);
+
+ tm.commit();
+
+ conn2 = cf.createConnection();
+ conn2.start();
+ Session sessReceiver = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer cons = sessReceiver.createConsumer(queue1);
+ TextMessage m2 = (TextMessage)cons.receive(5000);
+ Assert.assertNotNull(m2);
+ Assert.assertEquals("XATest1", m2.getText());
+ m2 = (TextMessage)cons.receive(5000);
+ Assert.assertNotNull(m2);
+ Assert.assertEquals("XATest2", m2.getText());
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+
+ }
+
+ @Test
+ public void test1PCReceiveCommit() throws Exception
+ {
+ XAConnection conn = null;
+ Connection conn2 = null;
+
+ try
+ {
+ conn2 = cf.createConnection();
+ conn2.start();
+ Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = sessProducer.createProducer(queue1);
+ Message m = sessProducer.createTextMessage("XATest1");
+ prod.send(m);
+ m = sessProducer.createTextMessage("XATest2");
+ prod.send(m);
+
+ conn = xacf.createXAConnection();
+ conn.start();
+
+ tm.begin();
+
+ XASession sess = conn.createXASession();
+ XAResource res = sess.getXAResource();
+
+ Transaction tx = tm.getTransaction();
+ tx.enlistResource(res);
+
+ MessageConsumer cons = sess.createConsumer(queue1);
+
+ TextMessage m2 = (TextMessage)cons.receive(5000);
+
+ Assert.assertNotNull(m2);
+ Assert.assertEquals("XATest1", m2.getText());
+ m2 = (TextMessage)cons.receive(5000);
+
+ Assert.assertNotNull(m2);
+ Assert.assertEquals("XATest2", m2.getText());
+
+ tx.delistResource(res, XAResource.TMSUCCESS);
+
+ tm.commit();
+
+ // New tx
+ tm.begin();
+ tx = tm.getTransaction();
+ tx.enlistResource(res);
+
+ Message m3 = cons.receive(100);
+
+ Assert.assertNull(m3);
+
+ tx.delistResource(res, XAResource.TMSUCCESS);
+
+ tm.commit();
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+
+ }
+
+ @Test
+ public void test1PCReceiveRollback() throws Exception
+ {
+ XAConnection conn = null;
+ Connection conn2 = null;
+
+ try
+ {
+ conn2 = cf.createConnection();
+ Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = sessProducer.createProducer(queue1);
+ Message m = sessProducer.createTextMessage("XATest1");
+ prod.send(m);
+ m = sessProducer.createTextMessage("XATest2");
+ prod.send(m);
+
+ conn = xacf.createXAConnection();
+ conn.start();
+
+ tm.begin();
+
+ XASession sess = conn.createXASession();
+ XAResource res = sess.getXAResource();
+
+ Transaction tx = tm.getTransaction();
+ tx.enlistResource(res);
+
+ MessageConsumer cons = sess.createConsumer(queue1);
+
+ TextMessage m2 = (TextMessage)cons.receive(5000);
+
+ Assert.assertNotNull(m2);
+ Assert.assertEquals("XATest1", m2.getText());
+
+ m2 = (TextMessage)cons.receive(5000);
+
+ Assert.assertNotNull(m2);
+ Assert.assertEquals("XATest2", m2.getText());
+
+ tx.delistResource(res, XAResource.TMSUCCESS);
+
+ tm.rollback();
+
+ // Message should be redelivered
+
+ // New tx
+ tm.begin();
+ tx = tm.getTransaction();
+ tx.enlistResource(res);
+
+ TextMessage m3 = (TextMessage)cons.receive(5000);
+
+ Assert.assertNotNull(m3);
+ Assert.assertEquals("XATest1", m3.getText());
+
+ m3 = (TextMessage)cons.receive(5000);
+
+ Assert.assertNotNull(m3);
+ Assert.assertEquals("XATest2", m3.getText());
+
+ Assert.assertTrue(m3.getJMSRedelivered());
+
+ tx.delistResource(res, XAResource.TMSUCCESS);
+
+ tm.commit();
+
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+
+ }
+
+ @Test
+ public void testMultipleSessionsOneTxCommitAcknowledge1PCOptimization() throws Exception
+ {
+ XAConnection conn = null;
+ Connection conn2 = null;
+
+ // Since both resources have some RM, TM will probably use 1PC optimization
+
+ try
+ {
+ // First send 2 messages
+ conn2 = cf.createConnection();
+ Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = sessProducer.createProducer(queue1);
+ Message m = sessProducer.createTextMessage("jellyfish1");
+ prod.send(m);
+ m = sessProducer.createTextMessage("jellyfish2");
+ prod.send(m);
+
+ conn = xacf.createXAConnection();
+ conn.start();
+
+ tm.begin();
+
+ // Create 2 sessions and enlist them
+ XASession sess1 = conn.createXASession();
+ XAResource res1 = sess1.getXAResource();
+ XASession sess2 = conn.createXASession();
+ XAResource res2 = sess2.getXAResource();
+
+ Transaction tx = tm.getTransaction();
+ tx.enlistResource(res1);
+ tx.enlistResource(res2);
+
+ // Receive the messages, one on each consumer
+ MessageConsumer cons1 = sess1.createConsumer(queue1);
+ TextMessage r1 = (TextMessage)cons1.receive(5000);
+
+ Assert.assertNotNull(r1);
+ Assert.assertEquals("jellyfish1", r1.getText());
+
+ cons1.close();
+
+ MessageConsumer cons2 = sess2.createConsumer(queue1);
+ TextMessage r2 = (TextMessage)cons2.receive(5000);
+
+ Assert.assertNotNull(r2);
+ Assert.assertEquals("jellyfish2", r2.getText());
+
+ tx.delistResource(res1, XAResource.TMSUCCESS);
+ tx.delistResource(res2, XAResource.TMSUCCESS);
+
+ // commit
+ tm.commit();
+
+ Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer cons = sess.createConsumer(queue1);
+ conn2.start();
+
+ TextMessage r3 = (TextMessage)cons.receive(100);
+ Assert.assertNull(r3);
+
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+
+ }
+
+ @Test
+ public void testMultipleSessionsOneTxCommitAcknowledge() throws Exception
+ {
+ XAConnection conn = null;
+ Connection conn2 = null;
+
+ try
+ {
+ // First send 2 messages
+ conn2 = cf.createConnection();
+ Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = sessProducer.createProducer(queue1);
+ Message m = sessProducer.createTextMessage("jellyfish1");
+ prod.send(m);
+ m = sessProducer.createTextMessage("jellyfish2");
+ prod.send(m);
+
+ conn = xacf.createXAConnection();
+ conn.start();
+
+ tm.begin();
+
+ // Create 2 sessions and enlist them
+ XASession sess1 = conn.createXASession();
+ ClientSessionInternal res1 = (ClientSessionInternal)sess1.getXAResource();
+ XASession sess2 = conn.createXASession();
+ ClientSessionInternal res2 = (ClientSessionInternal)sess2.getXAResource();
+ res1.setForceNotSameRM(true);
+ res2.setForceNotSameRM(true);
+
+ Transaction tx = tm.getTransaction();
+ tx.enlistResource(res1);
+ tx.enlistResource(res2);
+
+ // Receive the messages, one on each consumer
+ MessageConsumer cons1 = sess1.createConsumer(queue1);
+ TextMessage r1 = (TextMessage)cons1.receive(5000);
+
+ Assert.assertNotNull(r1);
+ Assert.assertEquals("jellyfish1", r1.getText());
+
+ cons1.close();
+
+ MessageConsumer cons2 = sess2.createConsumer(queue1);
+ TextMessage r2 = (TextMessage)cons2.receive(5000);
+
+ Assert.assertNotNull(r2);
+ Assert.assertEquals("jellyfish2", r2.getText());
+
+ tx.delistResource(res1, XAResource.TMSUCCESS);
+ tx.delistResource(res2, XAResource.TMSUCCESS);
+
+ // commit
+ tm.commit();
+
+ Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer cons = sess.createConsumer(queue1);
+ conn2.start();
+
+ TextMessage r3 = (TextMessage)cons.receive(100);
+ Assert.assertNull(r3);
+
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+
+ }
+
+ @Test
+ public void testMultipleSessionsOneTxRollbackAcknowledge1PCOptimization() throws Exception
+ {
+ XAConnection conn = null;
+ Connection conn2 = null;
+
+ // Since both resources have some RM, TM will probably use 1PC optimization
+
+ try
+ {
+ // First send 2 messages
+ conn2 = cf.createConnection();
+ Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = sessProducer.createProducer(queue1);
+ Message m = sessProducer.createTextMessage("jellyfish1");
+ prod.send(m);
+ m = sessProducer.createTextMessage("jellyfish2");
+ prod.send(m);
+ m = sessProducer.createTextMessage("jellyfish3");
+ prod.send(m);
+ m = sessProducer.createTextMessage("jellyfish4");
+ prod.send(m);
+
+ conn = xacf.createXAConnection();
+ conn.start();
+
+ tm.begin();
+
+ // Create 2 sessions and enlist them
+ XASession sess1 = conn.createXASession();
+ ClientSessionInternal res1 = (ClientSessionInternal)sess1.getXAResource();
+ XASession sess2 = conn.createXASession();
+ ClientSessionInternal res2 = (ClientSessionInternal)sess2.getXAResource();
+
+ Transaction tx = tm.getTransaction();
+ tx.enlistResource(res1);
+ tx.enlistResource(res2);
+
+ // Receive the messages, two on each consumer
+ MessageConsumer cons1 = sess1.createConsumer(queue1);
+ TextMessage r1 = (TextMessage)cons1.receive(5000);
+
+ Assert.assertNotNull(r1);
+ Assert.assertEquals("jellyfish1", r1.getText());
+
+ r1 = (TextMessage)cons1.receive(5000);
+
+ Assert.assertNotNull(r1);
+ Assert.assertEquals("jellyfish2", r1.getText());
+
+ cons1.close();
+
+ MessageConsumer cons2 = sess2.createConsumer(queue1);
+ TextMessage r2 = (TextMessage)cons2.receive(5000);
+
+ Assert.assertNotNull(r2);
+ Assert.assertEquals("jellyfish3", r2.getText());
+
+ r2 = (TextMessage)cons2.receive(5000);
+
+ Assert.assertNotNull(r2);
+ Assert.assertEquals("jellyfish4", r2.getText());
+
+ cons2.close();
+
+ // rollback
+
+ tx.delistResource(res1, XAResource.TMSUCCESS);
+ tx.delistResource(res2, XAResource.TMSUCCESS);
+
+ tm.rollback();
+
+ // Rollback causes cancel which is asynch
+ Thread.sleep(1000);
+
+ // We cannot assume anything about the order in which the transaction manager rollsback
+ // the sessions - this is implementation dependent
+
+ Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer cons = sess.createConsumer(queue1);
+ conn2.start();
+
+ TextMessage r = (TextMessage)cons.receive(5000);
+ Assert.assertNotNull(r);
+
+ boolean session1First = false;
+
+ if (r.getText().equals("jellyfish1"))
+ {
+ session1First = true;
+ }
+ else if (r.getText().equals("jellyfish3"))
+ {
+ session1First = false;
+ }
+ else
+ {
+ Assert.fail("Unexpected message");
+ }
+
+ if (session1First)
+ {
+ r = (TextMessage)cons.receive(5000);
+
+ Assert.assertNotNull(r);
+
+ Assert.assertEquals("jellyfish2", r.getText());
+
+ r = (TextMessage)cons.receive(5000);
+
+ Assert.assertNotNull(r);
+
+ Assert.assertEquals("jellyfish3", r.getText());
+
+ r = (TextMessage)cons.receive(5000);
+
+ Assert.assertNotNull(r);
+
+ Assert.assertEquals("jellyfish4", r.getText());
+
+ }
+ else
+ {
+ r = (TextMessage)cons.receive(5000);
+
+ Assert.assertNotNull(r);
+
+ Assert.assertEquals("jellyfish4", r.getText());
+
+ r = (TextMessage)cons.receive(5000);
+
+ Assert.assertNotNull(r);
+
+ Assert.assertEquals("jellyfish1", r.getText());
+
+ r = (TextMessage)cons.receive(5000);
+
+ Assert.assertNotNull(r);
+
+ Assert.assertEquals("jellyfish2", r.getText());
+ }
+
+ r = (TextMessage)cons.receive(100);
+
+ Assert.assertNull(r);
+
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+
+ }
+
+ @Test
+ public void testMultipleSessionsOneTxRollbackAcknowledge() throws Exception
+ {
+ XAConnection conn = null;
+ Connection conn2 = null;
+
+ try
+ {
+ // First send 2 messages
+ conn2 = cf.createConnection();
+ Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = sessProducer.createProducer(queue1);
+ Message m = sessProducer.createTextMessage("jellyfish1");
+ prod.send(m);
+ m = sessProducer.createTextMessage("jellyfish2");
+ prod.send(m);
+ m = sessProducer.createTextMessage("jellyfish3");
+ prod.send(m);
+ m = sessProducer.createTextMessage("jellyfish4");
+ prod.send(m);
+
+ conn = xacf.createXAConnection();
+ conn.start();
+
+ tm.begin();
+
+ // Create 2 sessions and enlist them
+ XASession sess1 = conn.createXASession();
+ ClientSessionInternal res1 = (ClientSessionInternal)sess1.getXAResource();
+ XASession sess2 = conn.createXASession();
+ ClientSessionInternal res2 = (ClientSessionInternal)sess2.getXAResource();
+ res1.setForceNotSameRM(true);
+ res2.setForceNotSameRM(true);
+
+ Transaction tx = tm.getTransaction();
+ tx.enlistResource(res1);
+ tx.enlistResource(res2);
+
+ // Receive the messages, two on each consumer
+ MessageConsumer cons1 = sess1.createConsumer(queue1);
+ TextMessage r1 = (TextMessage)cons1.receive(5000);
+
+ Assert.assertNotNull(r1);
+ Assert.assertEquals("jellyfish1", r1.getText());
+
+ r1 = (TextMessage)cons1.receive(5000);
+
+ Assert.assertNotNull(r1);
+ Assert.assertEquals("jellyfish2", r1.getText());
+
+ cons1.close();
+
+ // Cancel is asynch
+ Thread.sleep(500);
+
+ MessageConsumer cons2 = sess2.createConsumer(queue1);
+ TextMessage r2 = (TextMessage)cons2.receive(5000);
+
+ Assert.assertNotNull(r2);
+ Assert.assertEquals("jellyfish3", r2.getText());
+
+ r2 = (TextMessage)cons2.receive(5000);
+
+ Assert.assertNotNull(r2);
+ Assert.assertEquals("jellyfish4", r2.getText());
+
+ // rollback
+
+ cons2.close();
+
+ tx.delistResource(res1, XAResource.TMSUCCESS);
+ tx.delistResource(res2, XAResource.TMSUCCESS);
+
+ tm.rollback();
+
+ // Rollback causes cancel which is asynch
+ Thread.sleep(1000);
+
+ // We cannot assume anything about the order in which the transaction manager rollsback
+ // the sessions - this is implementation dependent
+
+ Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer cons = sess.createConsumer(queue1);
+ conn2.start();
+
+ TextMessage r = (TextMessage)cons.receive(5000);
+ Assert.assertNotNull(r);
+
+ boolean session1First = false;
+
+ if (r.getText().equals("jellyfish1"))
+ {
+ session1First = true;
+ }
+ else if (r.getText().equals("jellyfish3"))
+ {
+ session1First = false;
+ }
+ else
+ {
+ Assert.fail("Unexpected message");
+ }
+
+ if (session1First)
+ {
+ r = (TextMessage)cons.receive(5000);
+
+ Assert.assertNotNull(r);
+
+ Assert.assertEquals("jellyfish2", r.getText());
+
+ r = (TextMessage)cons.receive(5000);
+
+ Assert.assertNotNull(r);
+
+ Assert.assertEquals("jellyfish3", r.getText());
+
+ r = (TextMessage)cons.receive(5000);
+
+ Assert.assertNotNull(r);
+
+ Assert.assertEquals("jellyfish4", r.getText());
+
+ }
+ else
+ {
+ r = (TextMessage)cons.receive(5000);
+
+ Assert.assertNotNull(r);
+
+ Assert.assertEquals("jellyfish4", r.getText());
+
+ r = (TextMessage)cons.receive(5000);
+
+ Assert.assertNotNull(r);
+
+ Assert.assertEquals("jellyfish1", r.getText());
+
+ r = (TextMessage)cons.receive(5000);
+
+ Assert.assertNotNull(r);
+
+ Assert.assertEquals("jellyfish2", r.getText());
+ }
+
+ r = (TextMessage)cons.receive(100);
+
+ Assert.assertNull(r);
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+ }
+
+ @Test
+ public void testMultipleSessionsOneTxRollbackAcknowledgeForceFailureInCommit() throws Exception
+ {
+ XAConnection conn = null;
+ Connection conn2 = null;
+
+ try
+ {
+ // First send 4 messages
+ conn2 = cf.createConnection();
+ Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = sessProducer.createProducer(queue1);
+
+ Message m = sessProducer.createTextMessage("jellyfish1");
+ prod.send(m);
+ m = sessProducer.createTextMessage("jellyfish2");
+ prod.send(m);
+ m = sessProducer.createTextMessage("jellyfish3");
+ prod.send(m);
+ m = sessProducer.createTextMessage("jellyfish4");
+ prod.send(m);
+
+ conn = xacf.createXAConnection();
+ conn.start();
+
+ tm.begin();
+
+ XASession sess1 = conn.createXASession();
+ XAResource res1 = sess1.getXAResource();
+ DummyXAResource res2 = new DummyXAResource(true);
+
+ Transaction tx = tm.getTransaction();
+ tx.enlistResource(res1);
+ tx.enlistResource(res2);
+
+ MessageConsumer cons1 = sess1.createConsumer(queue1);
+ TextMessage r1 = (TextMessage)cons1.receive(5000);
+
+ Assert.assertNotNull(r1);
+ Assert.assertEquals("jellyfish1", r1.getText());
+
+ r1 = (TextMessage)cons1.receive(5000);
+
+ Assert.assertNotNull(r1);
+ Assert.assertEquals("jellyfish2", r1.getText());
+
+ r1 = (TextMessage)cons1.receive(5000);
+
+ Assert.assertNotNull(r1);
+ Assert.assertEquals("jellyfish3", r1.getText());
+
+ r1 = (TextMessage)cons1.receive(5000);
+
+ Assert.assertNotNull(r1);
+ Assert.assertEquals("jellyfish4", r1.getText());
+
+ r1 = (TextMessage)cons1.receive(100);
+
+ Assert.assertNull(r1);
+
+ cons1.close();
+
+ // try and commit - and we're going to make the dummyxaresource throw an exception on commit,
+ // which should cause rollback to be called on the other resource
+
+ tx.delistResource(res1, XAResource.TMSUCCESS);
+ tx.delistResource(res2, XAResource.TMSUCCESS);
+
+ // rollback will cause an attempt to deliver messages locally to the original consumers.
+ // the original consumer has closed, so it will cancelled to the server
+ // the server cancel is asynch, so we need to sleep for a bit to make sure it completes
+ ExtrasTestLogger.LOGGER.trace("Forcing failure");
+ try
+ {
+ tm.commit();
+ Assert.fail("should not get here");
+ }
+ catch (Exception e)
+ {
+ // We should expect this
+ }
+
+ Thread.sleep(1000);
+
+ Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer cons = sess.createConsumer(queue1);
+ conn2.start();
+
+ TextMessage r = (TextMessage)cons.receive(5000);
+
+ Assert.assertNotNull(r);
+
+ Assert.assertEquals("jellyfish1", r.getText());
+
+ r = (TextMessage)cons.receive(5000);
+
+ Assert.assertNotNull(r);
+
+ Assert.assertEquals("jellyfish2", r.getText());
+
+ r = (TextMessage)cons.receive(5000);
+
+ Assert.assertNotNull(r);
+
+ Assert.assertEquals("jellyfish3", r.getText());
+
+ r = (TextMessage)cons.receive(5000);
+
+ Assert.assertNotNull(r);
+
+ Assert.assertEquals("jellyfish4", r.getText());
+
+ r = (TextMessage)cons.receive(100);
+
+ Assert.assertNull(r);
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+
+ }
+
+ @Test
+ public void testMultipleSessionsOneTxCommitSend1PCOptimization() throws Exception
+ {
+ // Since both resources have some RM, TM will probably use 1PC optimization
+
+ XAConnection conn = null;
+
+ Connection conn2 = null;
+
+ try
+ {
+ conn = xacf.createXAConnection();
+ conn.start();
+
+ tm.begin();
+
+ // Create 2 sessions and enlist them
+ XASession sess1 = conn.createXASession();
+ XAResource res1 = sess1.getXAResource();
+ XASession sess2 = conn.createXASession();
+ XAResource res2 = sess2.getXAResource();
+
+ Transaction tx = tm.getTransaction();
+ tx.enlistResource(res1);
+ tx.enlistResource(res2);
+
+ // Send 2 messages - one from each session
+
+ MessageProducer prod1 = sess1.createProducer(queue1);
+ MessageProducer prod2 = sess2.createProducer(queue1);
+
+ prod1.send(sess1.createTextMessage("echidna1"));
+ prod2.send(sess2.createTextMessage("echidna2"));
+
+ tx.delistResource(res1, XAResource.TMSUCCESS);
+ tx.delistResource(res2, XAResource.TMSUCCESS);
+
+ // commit
+ tm.commit();
+
+ // Messages should be in queue
+
+ conn2 = cf.createConnection();
+ Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer cons = sess.createConsumer(queue1);
+ conn2.start();
+
+ TextMessage r1 = (TextMessage)cons.receive(5000);
+ Assert.assertNotNull(r1);
+ Assert.assertEquals("echidna1", r1.getText());
+
+ TextMessage r2 = (TextMessage)cons.receive(5000);
+ Assert.assertNotNull(r2);
+ Assert.assertEquals("echidna2", r2.getText());
+
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+ }
+
+ @Test
+ public void testMultipleSessionsOneTxCommitSend() throws Exception
+ {
+ // Since both resources have some RM, TM will probably use 1PC optimization
+
+ XAConnection conn = null;
+
+ Connection conn2 = null;
+
+ try
+ {
+
+ conn = xacf.createXAConnection();
+ conn.start();
+
+ tm.begin();
+
+ // Create 2 sessions and enlist them
+ XASession sess1 = conn.createXASession();
+ ClientSessionInternal res1 = (ClientSessionInternal)sess1.getXAResource();
+ XASession sess2 = conn.createXASession();
+ ClientSessionInternal res2 = (ClientSessionInternal)sess2.getXAResource();
+ res1.setForceNotSameRM(true);
+ res2.setForceNotSameRM(true);
+
+ Transaction tx = tm.getTransaction();
+ tx.enlistResource(res1);
+ tx.enlistResource(res2);
+
+ // Send 2 messages - one from each session
+
+ MessageProducer prod1 = sess1.createProducer(queue1);
+ MessageProducer prod2 = sess2.createProducer(queue1);
+
+ prod1.send(sess1.createTextMessage("echidna1"));
+ prod2.send(sess2.createTextMessage("echidna2"));
+
+ tx.delistResource(res1, XAResource.TMSUCCESS);
+ tx.delistResource(res2, XAResource.TMSUCCESS);
+
+ // commit
+ tm.commit();
+
+ // Messages should be in queue
+
+ conn2 = cf.createConnection();
+ Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer cons = sess.createConsumer(queue1);
+ conn2.start();
+
+ TextMessage r1 = (TextMessage)cons.receive(5000);
+ Assert.assertNotNull(r1);
+ Assert.assertEquals("echidna1", r1.getText());
+
+ TextMessage r2 = (TextMessage)cons.receive(5000);
+ Assert.assertNotNull(r2);
+ Assert.assertEquals("echidna2", r2.getText());
+
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+
+ }
+
+ }
+
+ @Test
+ public void testMultipleSessionsOneTxRollbackSend1PCOptimization() throws Exception
+ {
+ // Since both resources have some RM, TM will probably use 1PC optimization
+
+ XAConnection conn = null;
+
+ Connection conn2 = null;
+
+ try
+ {
+ conn = xacf.createXAConnection();
+ conn.start();
+
+ tm.begin();
+
+ // Create 2 sessions and enlist them
+ XASession sess1 = conn.createXASession();
+ XAResource res1 = sess1.getXAResource();
+ XASession sess2 = conn.createXASession();
+ XAResource res2 = sess2.getXAResource();
+
+ Transaction tx = tm.getTransaction();
+ tx.enlistResource(res1);
+ tx.enlistResource(res2);
+
+ // Send 2 messages - one from each session
+
+ MessageProducer prod1 = sess1.createProducer(queue1);
+ MessageProducer prod2 = sess2.createProducer(queue1);
+
+ prod1.send(sess1.createTextMessage("echidna1"));
+ prod2.send(sess2.createTextMessage("echidna2"));
+
+ tx.delistResource(res1, XAResource.TMSUCCESS);
+ tx.delistResource(res2, XAResource.TMSUCCESS);
+
+ // rollback
+ tm.rollback();
+
+ // Messages should not be in queue
+
+ conn2 = cf.createConnection();
+ Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer cons = sess.createConsumer(queue1);
+ conn2.start();
+
+ TextMessage r1 = (TextMessage)cons.receive(100);
+ Assert.assertNull(r1);
+
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+ }
+
+ @Test
+ public void testMultipleSessionsOneTxRollbackSend() throws Exception
+ {
+ XAConnection conn = null;
+
+ Connection conn2 = null;
+
+ try
+ {
+
+ conn = xacf.createXAConnection();
+ conn.start();
+
+ tm.begin();
+
+ // Create 2 sessions and enlist them
+ XASession sess1 = conn.createXASession();
+ ClientSessionInternal res1 = (ClientSessionInternal)sess1.getXAResource();
+ XASession sess2 = conn.createXASession();
+ ClientSessionInternal res2 = (ClientSessionInternal)sess2.getXAResource();
+ res1.setForceNotSameRM(true);
+ res2.setForceNotSameRM(true);
+
+ Transaction tx = tm.getTransaction();
+ tx.enlistResource(res1);
+ tx.enlistResource(res2);
+
+ // Send 2 messages - one from each session
+
+ MessageProducer prod1 = sess1.createProducer(queue1);
+ MessageProducer prod2 = sess2.createProducer(queue1);
+
+ prod1.send(sess1.createTextMessage("echidna1"));
+ prod2.send(sess2.createTextMessage("echidna2"));
+
+ tx.delistResource(res1, XAResource.TMSUCCESS);
+ tx.delistResource(res2, XAResource.TMSUCCESS);
+
+ // rollback
+ tm.rollback();
+
+ // Messages should not be in queue
+
+ conn2 = cf.createConnection();
+ Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer cons = sess.createConsumer(queue1);
+ conn2.start();
+
+ TextMessage r1 = (TextMessage)cons.receive(100);
+ Assert.assertNull(r1);
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+ }
+
+ @Test
+ public void testOneSessionTwoTransactionsCommitAcknowledge() throws Exception
+ {
+ XAConnection conn = null;
+
+ Connection conn2 = null;
+
+ try
+ {
+ // First send 2 messages
+ conn2 = cf.createConnection();
+ Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = sessProducer.createProducer(queue1);
+ Message m = sessProducer.createTextMessage("jellyfish1");
+ prod.send(m);
+ m = sessProducer.createTextMessage("jellyfish2");
+ prod.send(m);
+
+ conn = xacf.createXAConnection();
+
+ // Create a session
+ XASession sess1 = conn.createXASession();
+ XAResource res1 = sess1.getXAResource();
+
+ conn.start();
+ MessageConsumer cons1 = sess1.createConsumer(queue1);
+
+ tm.begin();
+
+ Transaction tx1 = tm.getTransaction();
+ tx1.enlistResource(res1);
+
+ // Receive one message in one tx
+
+ TextMessage r1 = (TextMessage)cons1.receive(5000);
+ Assert.assertNotNull(r1);
+ Assert.assertEquals("jellyfish1", r1.getText());
+
+ tx1.delistResource(res1, XAResource.TMSUCCESS);
+
+ // suspend the tx
+ Transaction suspended = tm.suspend();
+
+ tm.begin();
+
+ Transaction tx2 = tm.getTransaction();
+ tx2.enlistResource(res1);
+
+ // Receive 2nd message in a different tx
+ TextMessage r2 = (TextMessage)cons1.receive(5000);
+ Assert.assertNotNull(r2);
+ Assert.assertEquals("jellyfish2", r2.getText());
+
+ tx2.delistResource(res1, XAResource.TMSUCCESS);
+
+ // commit this transaction
+ tm.commit();
+
+ // verify that no messages are available
+ conn2.close();
+ conn2 = cf.createConnection();
+ Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ conn2.start();
+ MessageConsumer cons = sess.createConsumer(queue1);
+ TextMessage r3 = (TextMessage)cons.receive(100);
+ Assert.assertNull(r3);
+
+ // now resume the first tx and then commit it
+ tm.resume(suspended);
+
+ tm.commit();
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+ }
+
+ @Test
+ public void testOneSessionTwoTransactionsRollbackAcknowledge() throws Exception
+ {
+ XAConnection conn = null;
+
+ Connection conn2 = null;
+
+ try
+ {
+ // First send 2 messages
+ conn2 = cf.createConnection();
+ Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = sessProducer.createProducer(queue1);
+ Message m = sessProducer.createTextMessage("jellyfish1");
+ prod.send(m);
+ m = sessProducer.createTextMessage("jellyfish2");
+ prod.send(m);
+
+ conn = xacf.createXAConnection();
+
+ // Create a session
+ XASession sess1 = conn.createXASession();
+ XAResource res1 = sess1.getXAResource();
+
+ conn.start();
+ MessageConsumer cons1 = sess1.createConsumer(queue1);
+
+ tm.begin();
+
+ Transaction tx1 = tm.getTransaction();
+ tx1.enlistResource(res1);
+
+ // Receive one message in one tx
+
+ TextMessage r1 = (TextMessage)cons1.receive(5000);
+ Assert.assertNotNull(r1);
+ Assert.assertEquals("jellyfish1", r1.getText());
+
+ tx1.delistResource(res1, XAResource.TMSUCCESS);
+
+ // suspend the tx
+ Transaction suspended = tm.suspend();
+
+ tm.begin();
+
+ Transaction tx2 = tm.getTransaction();
+ tx2.enlistResource(res1);
+
+ // Receive 2nd message in a different tx
+ TextMessage r2 = (TextMessage)cons1.receive(5000);
+ Assert.assertNotNull(r2);
+ Assert.assertEquals("jellyfish2", r2.getText());
+
+ cons1.close();
+
+ tx2.delistResource(res1, XAResource.TMSUCCESS);
+
+ // rollback this transaction
+ tm.rollback();
+
+ // verify that second message is available
+ conn2.close();
+ conn2 = cf.createConnection();
+ Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ conn2.start();
+ MessageConsumer cons = sess.createConsumer(queue1);
+
+ TextMessage r3 = (TextMessage)cons.receive(5000);
+
+ Assert.assertNotNull(r3);
+ Assert.assertEquals("jellyfish2", r3.getText());
+ r3 = (TextMessage)cons.receive(100);
+ Assert.assertNull(r3);
+
+ // rollback the other tx
+ tm.resume(suspended);
+ tm.rollback();
+
+ // Verify the first message is now available
+ r3 = (TextMessage)cons.receive(5000);
+ Assert.assertNotNull(r3);
+ Assert.assertEquals("jellyfish1", r3.getText());
+ r3 = (TextMessage)cons.receive(100);
+ Assert.assertNull(r3);
+
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+
+ }
+
+ }
+
+ @Test
+ public void testOneSessionTwoTransactionsCommitSend() throws Exception
+ {
+ XAConnection conn = null;
+
+ Connection conn2 = null;
+
+ try
+ {
+ conn = xacf.createXAConnection();
+
+ // Create a session
+ XASession sess1 = conn.createXASession();
+ XAResource res1 = sess1.getXAResource();
+
+ MessageProducer prod1 = sess1.createProducer(queue1);
+
+ tm.begin();
+
+ Transaction tx1 = tm.getTransaction();
+ tx1.enlistResource(res1);
+
+ // Send a message
+ prod1.send(sess1.createTextMessage("kangaroo1"));
+
+ tx1.delistResource(res1, XAResource.TMSUCCESS);
+
+ // suspend the tx
+ Transaction suspended = tm.suspend();
+
+ tm.begin();
+
+ // Send another message in another tx using the same session
+ Transaction tx2 = tm.getTransaction();
+ tx2.enlistResource(res1);
+
+ // Send a message
+ prod1.send(sess1.createTextMessage("kangaroo2"));
+
+ tx2.delistResource(res1, XAResource.TMSUCCESS);
+
+ // commit this transaction
+ tm.commit();
+
+ // verify only kangaroo2 message is sent
+ conn2 = cf.createConnection();
+ Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ conn2.start();
+ MessageConsumer cons = sess.createConsumer(queue1);
+ TextMessage r1 = (TextMessage)cons.receive(5000);
+ Assert.assertNotNull(r1);
+ Assert.assertEquals("kangaroo2", r1.getText());
+ TextMessage r2 = (TextMessage)cons.receive(100);
+ Assert.assertNull(r2);
+
+ // now resume the first tx and then commit it
+ tm.resume(suspended);
+
+ tm.commit();
+
+ // verify that the first text message is received
+ TextMessage r3 = (TextMessage)cons.receive(5000);
+ Assert.assertNotNull(r3);
+ Assert.assertEquals("kangaroo1", r3.getText());
+
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+
+ }
+
+ }
+
+ @Test
+ public void testIsSamRM() throws Exception
+ {
+ XAConnection conn = null;
+
+ conn = xacf.createXAConnection();
+
+ // Create a session
+ XASession sess1 = conn.createXASession();
+ XAResource res1 = sess1.getXAResource();
+
+ // Create a session
+ XASession sess2 = conn.createXASession();
+ XAResource res2 = sess2.getXAResource();
+
+ Assert.assertTrue(res1.isSameRM(res2));
+ }
+
+ static class DummyXAResource implements XAResource
+ {
+ boolean failOnPrepare;
+
+ DummyXAResource()
+ {
+ }
+
+ DummyXAResource(final boolean failOnPrepare)
+ {
+ this.failOnPrepare = failOnPrepare;
+ }
+
+ public void commit(final Xid arg0, final boolean arg1) throws XAException
+ {
+ }
+
+ public void end(final Xid arg0, final int arg1) throws XAException
+ {
+ }
+
+ public void forget(final Xid arg0) throws XAException
+ {
+ }
+
+ public int getTransactionTimeout() throws XAException
+ {
+ return 0;
+ }
+
+ public boolean isSameRM(final XAResource arg0) throws XAException
+ {
+ return false;
+ }
+
+ public int prepare(final Xid arg0) throws XAException
+ {
+ if (failOnPrepare)
+ {
+ throw new XAException(XAException.XAER_RMFAIL);
+ }
+ return XAResource.XA_OK;
+ }
+
+ public Xid[] recover(final int arg0) throws XAException
+ {
+ return null;
+ }
+
+ public void rollback(final Xid arg0) throws XAException
+ {
+ }
+
+ public boolean setTransactionTimeout(final int arg0) throws XAException
+ {
+ return false;
+ }
+
+ public void start(final Xid arg0, final int arg1) throws XAException
+ {
+
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/integration-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml
index 6eb00bf..3c32bfe 100644
--- a/tests/integration-tests/pom.xml
+++ b/tests/integration-tests/pom.xml
@@ -181,18 +181,6 @@
<version>5.0.0.GA</version>
</dependency>
- <!-- Needed for JMS Bridge Tests -->
- <dependency>
- <groupId>org.jboss.jbossts.jts</groupId>
- <artifactId>jbossjts-jacorb</artifactId>
- <version>4.17.13.Final</version>
- </dependency>
- <dependency>
- <groupId>org.jboss</groupId>
- <artifactId>jboss-transaction-spi</artifactId>
- <version>7.1.0.Final</version>
- </dependency>
-
<!--Vertx provided dependencies-->
<dependency>
<groupId>io.vertx</groupId>
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/bridge/BridgeTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/bridge/BridgeTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/bridge/BridgeTestBase.java
deleted file mode 100644
index d5dfa9c..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/bridge/BridgeTestBase.java
+++ /dev/null
@@ -1,613 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.tests.integration.jms.bridge;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.XAConnectionFactory;
-import javax.transaction.TransactionManager;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Set;
-
-import com.arjuna.ats.arjuna.coordinator.TransactionReaper;
-import com.arjuna.ats.arjuna.coordinator.TxControl;
-import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple;
-import org.apache.activemq.api.core.TransportConfiguration;
-import org.apache.activemq.api.core.management.ResourceNames;
-import org.apache.activemq.api.jms.ActiveMQJMSClient;
-import org.apache.activemq.api.jms.JMSFactoryType;
-import org.apache.activemq.api.jms.management.JMSQueueControl;
-import org.apache.activemq.api.jms.management.TopicControl;
-import org.apache.activemq.core.config.Configuration;
-import org.apache.activemq.core.registry.JndiBindingRegistry;
-import org.apache.activemq.core.remoting.impl.invm.TransportConstants;
-import org.apache.activemq.core.server.ActiveMQServer;
-import org.apache.activemq.core.server.ActiveMQServers;
-import org.apache.activemq.core.server.management.ManagementService;
-import org.apache.activemq.jms.bridge.ConnectionFactoryFactory;
-import org.apache.activemq.jms.bridge.DestinationFactory;
-import org.apache.activemq.jms.bridge.QualityOfServiceMode;
-import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
-import org.apache.activemq.jms.client.ActiveMQJMSConnectionFactory;
-import org.apache.activemq.jms.client.ActiveMQMessage;
-import org.apache.activemq.jms.client.ActiveMQXAConnectionFactory;
-import org.apache.activemq.jms.server.JMSServerManager;
-import org.apache.activemq.jms.server.impl.JMSServerManagerImpl;
-import org.apache.activemq.tests.integration.IntegrationTestLogger;
-import org.apache.activemq.tests.unit.util.InVMNamingContext;
-import org.apache.activemq.tests.util.UnitTestCase;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-
-/**
- * A BridgeTestBase
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- */
-public abstract class BridgeTestBase extends UnitTestCase
-{
- private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
-
- protected ConnectionFactoryFactory cff0, cff1;
-
- protected ConnectionFactoryFactory cff0xa, cff1xa;
-
- protected ConnectionFactory cf0, cf1;
-
- protected XAConnectionFactory cf0xa, cf1xa;
-
- protected DestinationFactory sourceQueueFactory;
- protected DestinationFactory targetQueueFactory;
- protected DestinationFactory localTargetQueueFactory;
- protected DestinationFactory sourceTopicFactory;
-
- protected Queue sourceQueue, targetQueue, localTargetQueue;
-
- protected Topic sourceTopic;
-
- protected ActiveMQServer server0;
-
- protected JMSServerManager jmsServer0;
-
- protected ActiveMQServer server1;
-
- protected JMSServerManager jmsServer1;
-
- private InVMNamingContext context0;
-
- protected InVMNamingContext context1;
-
- protected HashMap<String, Object> params1;
-
- protected ConnectionFactoryFactory cff0LowProducerWindow;
-
- @Override
- @Before
- public void setUp() throws Exception
- {
- super.setUp();
-
- // Start the servers
- Configuration conf0 = createBasicConfig()
- .setJournalDirectory(getJournalDir(0, false))
- .setBindingsDirectory(getBindingsDir(0, false))
- .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY));
-
- server0 = addServer(ActiveMQServers.newActiveMQServer(conf0, false));
-
- context0 = new InVMNamingContext();
- jmsServer0 = new JMSServerManagerImpl(server0);
- jmsServer0.setRegistry(new JndiBindingRegistry(context0));
- jmsServer0.start();
-
- params1 = new HashMap<String, Object>();
- params1.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
-
- Configuration conf1 = createBasicConfig()
- .setJournalDirectory(getJournalDir(1, false))
- .setBindingsDirectory(getBindingsDir(1, false))
- .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params1));
-
- server1 = addServer(ActiveMQServers.newActiveMQServer(conf1, false));
-
- context1 = new InVMNamingContext();
-
- jmsServer1 = new JMSServerManagerImpl(server1);
- jmsServer1.setRegistry(new JndiBindingRegistry(context1));
- jmsServer1.start();
-
- createQueue("sourceQueue", 0);
-
- jmsServer0.createTopic(false, "sourceTopic", "/topic/sourceTopic");
-
- createQueue("localTargetQueue", 0);
-
- createQueue("targetQueue", 1);
-
- setUpAdministeredObjects();
- TxControl.enable();
- // We need a local transaction and recovery manager
- // We must start this after the remote servers have been created or it won't
- // have deleted the database and the recovery manager may attempt to recover transactions
-
- }
-
- protected void createQueue(final String queueName, final int index) throws Exception
- {
- JMSServerManager server = jmsServer0;
- if (index == 1)
- {
- server = jmsServer1;
- }
- assertTrue("queue '/queue/" + queueName + "' created",
- server.createQueue(false, queueName, null, true, "/queue/" + queueName));
- }
-
- @Override
- @After
- public void tearDown() throws Exception
- {
- checkEmpty(sourceQueue, 0);
- checkEmpty(localTargetQueue, 0);
- checkEmpty(targetQueue, 1);
-
- // Check no subscriptions left lying around
-
- checkNoSubscriptions(sourceTopic, 0);
- if (cff0 instanceof ActiveMQConnectionFactory)
- {
- ((ActiveMQConnectionFactory) cff0).close();
- }
- if (cff1 instanceof ActiveMQConnectionFactory)
- {
- ((ActiveMQConnectionFactory) cff1).close();
- }
- stopComponent(jmsServer0);
- stopComponent(jmsServer1);
- cff0 = cff1 = null;
- cff0xa = cff1xa = null;
-
- cf0 = cf1 = null;
-
- cf0xa = cf1xa = null;
-
- sourceQueueFactory = targetQueueFactory = localTargetQueueFactory = sourceTopicFactory = null;
-
- sourceQueue = targetQueue = localTargetQueue = null;
-
- sourceTopic = null;
-
- server0 = null;
-
- jmsServer0 = null;
-
- server1 = null;
-
- jmsServer1 = null;
- if (context0 != null)
- context0.close();
- context0 = null;
- if (context1 != null)
- context1.close();
- context1 = null;
-
- // Shutting down Arjuna threads
- TxControl.disable(true);
-
- TransactionReaper.terminate(false);
- super.tearDown();
- }
-
-
- protected void setUpAdministeredObjects() throws Exception
- {
- cff0LowProducerWindow = new ConnectionFactoryFactory()
- {
- public ConnectionFactory createConnectionFactory() throws Exception
- {
- ActiveMQConnectionFactory cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,
- new TransportConfiguration(
- INVM_CONNECTOR_FACTORY));
-
- // Note! We disable automatic reconnection on the session factory. The bridge needs to do the reconnection
- cf.setReconnectAttempts(0);
- cf.setBlockOnNonDurableSend(true);
- cf.setBlockOnDurableSend(true);
- cf.setCacheLargeMessagesClient(true);
- cf.setProducerWindowSize(100);
-
- return cf;
- }
-
- };
-
-
- cff0 = new ConnectionFactoryFactory()
- {
- public ConnectionFactory createConnectionFactory() throws Exception
- {
- ActiveMQConnectionFactory cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,
- new TransportConfiguration(
- INVM_CONNECTOR_FACTORY));
-
- // Note! We disable automatic reconnection on the session factory. The bridge needs to do the reconnection
- cf.setReconnectAttempts(0);
- cf.setBlockOnNonDurableSend(true);
- cf.setBlockOnDurableSend(true);
- cf.setCacheLargeMessagesClient(true);
-
- return cf;
- }
-
- };
-
- cff0xa = new ConnectionFactoryFactory()
- {
- public Object createConnectionFactory() throws Exception
- {
- ActiveMQXAConnectionFactory cf = (ActiveMQXAConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.XA_CF,
- new TransportConfiguration(
- INVM_CONNECTOR_FACTORY));
-
- // Note! We disable automatic reconnection on the session factory. The bridge needs to do the reconnection
- cf.setReconnectAttempts(0);
- cf.setBlockOnNonDurableSend(true);
- cf.setBlockOnDurableSend(true);
- cf.setCacheLargeMessagesClient(true);
-
- return cf;
- }
-
- };
-
- cf0 = (ConnectionFactory) cff0.createConnectionFactory();
- cf0xa = (XAConnectionFactory) cff0xa.createConnectionFactory();
-
- cff1 = new ConnectionFactoryFactory()
- {
-
- public ConnectionFactory createConnectionFactory() throws Exception
- {
- ActiveMQJMSConnectionFactory cf = (ActiveMQJMSConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,
- new TransportConfiguration(
- INVM_CONNECTOR_FACTORY,
- params1));
-
- // Note! We disable automatic reconnection on the session factory. The bridge needs to do the reconnection
- cf.setReconnectAttempts(0);
- cf.setBlockOnNonDurableSend(true);
- cf.setBlockOnDurableSend(true);
- cf.setCacheLargeMessagesClient(true);
-
- return cf;
- }
- };
-
- cff1xa = new ConnectionFactoryFactory()
- {
-
- public XAConnectionFactory createConnectionFactory() throws Exception
- {
- ActiveMQXAConnectionFactory cf = (ActiveMQXAConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.XA_CF,
- new TransportConfiguration(
- INVM_CONNECTOR_FACTORY,
- params1));
-
- // Note! We disable automatic reconnection on the session factory. The bridge needs to do the reconnection
- cf.setReconnectAttempts(0);
- cf.setBlockOnNonDurableSend(true);
- cf.setBlockOnDurableSend(true);
- cf.setCacheLargeMessagesClient(true);
-
- return cf;
- }
- };
-
- cf1 = (ConnectionFactory) cff1.createConnectionFactory();
- cf1xa = (XAConnectionFactory) cff1xa.createConnectionFactory();
-
- sourceQueueFactory = new DestinationFactory()
- {
- public Destination createDestination() throws Exception
- {
- return (Destination) context0.lookup("/queue/sourceQueue");
- }
- };
-
- sourceQueue = (Queue) sourceQueueFactory.createDestination();
-
- targetQueueFactory = new DestinationFactory()
- {
- public Destination createDestination() throws Exception
- {
- return (Destination) context1.lookup("/queue/targetQueue");
- }
- };
-
- targetQueue = (Queue) targetQueueFactory.createDestination();
-
- sourceTopicFactory = new DestinationFactory()
- {
- public Destination createDestination() throws Exception
- {
- return (Destination) context0.lookup("/topic/sourceTopic");
- }
- };
-
- sourceTopic = (Topic) sourceTopicFactory.createDestination();
-
- localTargetQueueFactory = new DestinationFactory()
- {
- public Destination createDestination() throws Exception
- {
- return (Destination) context0.lookup("/queue/localTargetQueue");
- }
- };
-
- localTargetQueue = (Queue) localTargetQueueFactory.createDestination();
- }
-
- protected void sendMessages(final ConnectionFactory cf,
- final Destination dest,
- final int start,
- final int numMessages,
- final boolean persistent,
- final boolean largeMessage) throws Exception
- {
- Connection conn = null;
-
- try
- {
- conn = cf.createConnection();
-
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer prod = sess.createProducer(dest);
-
- prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
- for (int i = start; i < start + numMessages; i++)
- {
- if (largeMessage)
- {
- BytesMessage msg = sess.createBytesMessage();
- ((ActiveMQMessage) msg).setInputStream(UnitTestCase.createFakeLargeStream(1024L * 1024L));
- msg.setStringProperty("msg", "message" + i);
- prod.send(msg);
- }
- else
- {
- TextMessage tm = sess.createTextMessage("message" + i);
- prod.send(tm);
- }
-
- }
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- }
- }
-
- protected void checkMessagesReceived(final ConnectionFactory cf,
- final Destination dest,
- final QualityOfServiceMode qosMode,
- final int numMessages,
- final boolean longWaitForFirst,
- final boolean largeMessage) throws Exception
- {
- Connection conn = null;
-
- try
- {
- conn = cf.createConnection();
-
- conn.start();
-
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer cons = sess.createConsumer(dest);
-
- // Consume the messages
-
- Set<String> msgs = new HashSet<String>();
-
- int count = 0;
-
- // We always wait longer for the first one - it may take some time to arrive especially if we are
- // waiting for recovery to kick in
- while (true)
- {
- Message tm = cons.receive(count == 0 ? (longWaitForFirst ? 60000 : 10000) : 5000);
-
- if (tm == null)
- {
- break;
- }
-
- // log.info("Got message " + tm.getText());
-
- if (largeMessage)
- {
- BytesMessage bmsg = (BytesMessage) tm;
- msgs.add(tm.getStringProperty("msg"));
- byte[] buffRead = new byte[1024];
- for (int i = 0; i < 1024; i++)
- {
- Assert.assertEquals(1024, bmsg.readBytes(buffRead));
- }
- }
- else
- {
- msgs.add(((TextMessage) tm).getText());
- }
-
- count++;
-
- }
-
- if (qosMode == QualityOfServiceMode.ONCE_AND_ONLY_ONCE || qosMode == QualityOfServiceMode.DUPLICATES_OK)
- {
- // All the messages should be received
-
- for (int i = 0; i < numMessages; i++)
- {
- Assert.assertTrue("quality=" + qosMode + ", #=" + i + ", message=" + msgs, msgs.contains("message" + i));
- }
-
- // Should be no more
- if (qosMode == QualityOfServiceMode.ONCE_AND_ONLY_ONCE)
- {
- Assert.assertEquals(numMessages, msgs.size());
- }
- }
- else if (qosMode == QualityOfServiceMode.AT_MOST_ONCE)
- {
- // No *guarantee* that any messages will be received
- // but you still might get some depending on how/where the crash occurred
- }
-
- BridgeTestBase.log.trace("Check complete");
-
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- }
- }
-
- protected void checkAllMessageReceivedInOrder(final ConnectionFactory cf,
- final Destination dest,
- final int start,
- final int numMessages,
- final boolean largeMessage) throws Exception
- {
- Connection conn = null;
- try
- {
- conn = cf.createConnection();
-
- conn.start();
-
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer cons = sess.createConsumer(dest);
-
- // Consume the messages
-
- for (int i = 0; i < numMessages; i++)
- {
- Message tm = cons.receive(30000);
-
- Assert.assertNotNull(tm);
-
- if (largeMessage)
- {
- BytesMessage bmsg = (BytesMessage) tm;
- Assert.assertEquals("message" + (i + start), tm.getStringProperty("msg"));
- byte[] buffRead = new byte[1024];
- for (int j = 0; j < 1024; j++)
- {
- Assert.assertEquals(1024, bmsg.readBytes(buffRead));
- }
- }
- else
- {
- Assert.assertEquals("message" + (i + start), ((TextMessage) tm).getText());
- }
- }
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- }
- }
-
- public boolean checkEmpty(final Queue queue, final int index) throws Exception
- {
- ManagementService managementService = server0.getManagementService();
- if (index == 1)
- {
- managementService = server1.getManagementService();
- }
- JMSQueueControl queueControl = (JMSQueueControl) managementService.getResource(ResourceNames.JMS_QUEUE + queue.getQueueName());
-
- //server may be closed
- if (queueControl != null)
- {
- queueControl.flushExecutor();
- Long messageCount = queueControl.getMessageCount();
-
- if (messageCount > 0)
- {
- queueControl.removeMessages(null);
- }
- }
- return true;
- }
-
- protected void checkNoSubscriptions(final Topic topic, final int index) throws Exception
- {
- ManagementService managementService = server0.getManagementService();
- if (index == 1)
- {
- managementService = server1.getManagementService();
- }
- TopicControl topicControl = (TopicControl) managementService.getResource(ResourceNames.JMS_TOPIC + topic.getTopicName());
- Assert.assertEquals(0, topicControl.getSubscriptionCount());
-
- }
-
- protected void removeAllMessages(final String queueName, final int index) throws Exception
- {
- ManagementService managementService = server0.getManagementService();
- if (index == 1)
- {
- managementService = server1.getManagementService();
- }
- JMSQueueControl queueControl = (JMSQueueControl) managementService.getResource(ResourceNames.JMS_QUEUE + queueName);
- queueControl.removeMessages(null);
- }
-
- protected TransactionManager newTransactionManager()
- {
- return new TransactionManagerImple();
- }
-
- // Inner classes -------------------------------------------------------------------
-}