You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2015/03/06 23:30:37 UTC
[04/15] activemq-6 git commit: Refactored the testsuite a bit
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/XATest.java
----------------------------------------------------------------------
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/XATest.java b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/XATest.java
deleted file mode 100644
index 019b534..0000000
--- a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/XATest.java
+++ /dev/null
@@ -1,2336 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.jms.tests;
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.XAConnection;
-import javax.jms.XAConnectionFactory;
-import javax.jms.XASession;
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
-import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple;
-
-import org.apache.activemq.core.client.impl.ClientSessionInternal;
-import org.apache.activemq.jms.tests.util.ProxyAssertSupport;
-import org.jboss.tm.TxUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- *
- * A XATestBase
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- * @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
- * @version <tt>$Revision: 1.1 $</tt>
- *
- *
- */
-public class XATest extends ActiveMQServerTestCase
-{
- // Constants -----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- protected TransactionManager tm;
-
- protected Transaction suspendedTx;
-
- protected XAConnectionFactory xacf;
-
- protected ConnectionFactory cf;
-
- // Constructors --------------------------------------------------
-
- // TestCase overrides -------------------------------------------
-
- @Override
- @Before
- public void setUp() throws Exception
- {
- super.setUp();
-
- cf = getConnectionFactory();
-
- xacf = getXAConnectionFactory();
-
- tm = getTransactionManager();
-
- ProxyAssertSupport.assertTrue(tm instanceof TransactionManagerImple);
-
- suspendedTx = tm.suspend();
- }
-
- @Override
- @After
- public void tearDown() throws Exception
- {
- if (TxUtils.isUncommitted(tm))
- {
- // roll it back
- try
- {
- tm.rollback();
- }
- catch (Throwable ignore)
- {
- // The connection will probably be closed so this may well throw an exception
- }
- }
- if (tm.getTransaction() != null)
- {
- Transaction tx = tm.suspend();
- if (tx != null)
- {
- log.warn("Transaction still associated with thread " + tx +
- " at status " +
- TxUtils.getStatusAsString(tx.getStatus()));
- }
- }
-
- if (suspendedTx != null)
- {
- tm.resume(suspendedTx);
- }
- }
-
- // Public --------------------------------------------------------
-
- @Test
- public void test2PCSendCommit1PCOptimization() throws Exception
- {
- // Since both resources have same RM, TM will probably use 1PC optimization
-
- XAConnection conn = null;
- Connection conn2 = null;
-
- try
- {
- conn = xacf.createXAConnection();
-
- tm.begin();
-
- XASession sess = conn.createXASession();
- XAResource res = sess.getXAResource();
-
- XAResource res2 = new DummyXAResource();
-
- Transaction tx = tm.getTransaction();
- tx.enlistResource(res);
- tx.enlistResource(res2);
-
- MessageProducer prod = sess.createProducer(queue1);
- prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- Message m = sess.createTextMessage("XATest1");
- prod.send(queue1, m);
- m = sess.createTextMessage("XATest2");
- prod.send(queue1, m);
-
- tx.delistResource(res, XAResource.TMSUCCESS);
- tx.delistResource(res2, XAResource.TMSUCCESS);
-
- tm.commit();
-
- conn2 = cf.createConnection();
- conn2.start();
- Session sessReceiver = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer cons = sessReceiver.createConsumer(queue1);
- TextMessage m2 = (TextMessage)cons.receive(1000);
- ProxyAssertSupport.assertNotNull(m2);
- ProxyAssertSupport.assertEquals("XATest1", m2.getText());
- m2 = (TextMessage)cons.receive(1000);
- ProxyAssertSupport.assertNotNull(m2);
- ProxyAssertSupport.assertEquals("XATest2", m2.getText());
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- if (conn2 != null)
- {
- conn2.close();
- }
- }
- }
-
- @Test
- public void test2PCSendCommit() throws Exception
- {
- XAConnection conn = null;
- Connection conn2 = null;
-
- try
- {
- conn = xacf.createXAConnection();
-
- tm.begin();
-
- XASession sess = conn.createXASession();
-
- XAResource res = sess.getXAResource();
- XAResource res2 = new DummyXAResource();
-
- // To prevent 1PC optimization being used
- // res.setForceNotSameRM(true);
-
- Transaction tx = tm.getTransaction();
-
- tx.enlistResource(res);
-
- tx.enlistResource(res2);
-
- MessageProducer prod = sess.createProducer(queue1);
- prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- Message m = sess.createTextMessage("XATest1");
- prod.send(queue1, m);
- m = sess.createTextMessage("XATest2");
- prod.send(queue1, m);
-
- tx.delistResource(res, XAResource.TMSUCCESS);
- tx.delistResource(res2, XAResource.TMSUCCESS);
-
- tm.commit();
-
- conn2 = cf.createConnection();
- conn2.start();
- Session sessReceiver = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer cons = sessReceiver.createConsumer(queue1);
- TextMessage m2 = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
- ProxyAssertSupport.assertNotNull(m2);
- ProxyAssertSupport.assertEquals("XATest1", m2.getText());
- m2 = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
- ProxyAssertSupport.assertNotNull(m2);
- ProxyAssertSupport.assertEquals("XATest2", m2.getText());
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- if (conn2 != null)
- {
- conn2.close();
- }
- }
- }
-
- @Test
- public void test2PCSendRollback1PCOptimization() throws Exception
- {
- // Since both resources have some RM, TM will probably use 1PC optimization
-
- XAConnection conn = null;
- Connection conn2 = null;
- try
- {
- conn = xacf.createXAConnection();
-
- tm.begin();
-
- XASession sess = conn.createXASession();
- XAResource res = sess.getXAResource();
-
- XAResource res2 = new DummyXAResource();
-
- Transaction tx = tm.getTransaction();
- tx.enlistResource(res);
- tx.enlistResource(res2);
-
- MessageProducer prod = sess.createProducer(queue1);
- prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- Message m = sess.createTextMessage("XATest1");
- prod.send(queue1, m);
- m = sess.createTextMessage("XATest2");
- prod.send(queue1, m);
-
- tx.delistResource(res, XAResource.TMSUCCESS);
- tx.delistResource(res2, XAResource.TMSUCCESS);
-
- tm.rollback();
-
- conn2 = cf.createConnection();
- conn2.start();
- Session sessReceiver = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer cons = sessReceiver.createConsumer(queue1);
- Message m2 = cons.receive(ActiveMQServerTestCase.MIN_TIMEOUT);
- ProxyAssertSupport.assertNull(m2);
-
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- if (conn2 != null)
- {
- conn2.close();
- }
- }
- }
-
- @Test
- public void test2PCSendFailOnPrepare() throws Exception
- {
- XAConnection conn = null;
- Connection conn2 = null;
- try
- {
- conn = xacf.createXAConnection();
-
- tm.begin();
-
- XASession sess = conn.createXASession();
- XAResource res = sess.getXAResource();
-
- // prevent 1Pc optimisation
- // res.setForceNotSameRM(true);
-
- XAResource res2 = new DummyXAResource(true);
- XAResource res3 = new DummyXAResource();
- XAResource res4 = new DummyXAResource();
-
- Transaction tx = tm.getTransaction();
- tx.enlistResource(res);
- tx.enlistResource(res2);
- tx.enlistResource(res3);
- tx.enlistResource(res4);
-
- MessageProducer prod = sess.createProducer(queue1);
- prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- Message m = sess.createTextMessage("XATest1");
- prod.send(queue1, m);
- m = sess.createTextMessage("XATest2");
- prod.send(queue1, m);
-
- tx.delistResource(res, XAResource.TMSUCCESS);
- tx.delistResource(res2, XAResource.TMSUCCESS);
- tx.delistResource(res3, XAResource.TMSUCCESS);
- tx.delistResource(res4, XAResource.TMSUCCESS);
-
- try
- {
- tm.commit();
-
- ProxyAssertSupport.fail("should not get here");
- }
- catch (Exception e)
- {
- // We should expect this
- }
-
- conn2 = cf.createConnection();
- conn2.start();
- Session sessReceiver = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer cons = sessReceiver.createConsumer(queue1);
- Message m2 = cons.receive(ActiveMQServerTestCase.MIN_TIMEOUT);
- ProxyAssertSupport.assertNull(m2);
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- if (conn2 != null)
- {
- conn2.close();
- }
- }
- }
-
- @Test
- public void test2PCSendRollback() throws Exception
- {
- XAConnection conn = null;
- Connection conn2 = null;
- try
- {
- conn = xacf.createXAConnection();
-
- tm.begin();
-
- XASession sess = conn.createXASession();
- XAResource res = sess.getXAResource();
-
- // prevent 1Pc optimisation
- // res.setForceNotSameRM(true);
-
- XAResource res2 = new DummyXAResource();
-
- Transaction tx = tm.getTransaction();
- tx.enlistResource(res);
- tx.enlistResource(res2);
-
- MessageProducer prod = sess.createProducer(queue1);
- prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- Message m = sess.createTextMessage("XATest1");
- prod.send(queue1, m);
- m = sess.createTextMessage("XATest2");
- prod.send(queue1, m);
-
- tx.delistResource(res, XAResource.TMSUCCESS);
- tx.delistResource(res2, XAResource.TMSUCCESS);
-
- tm.rollback();
-
- conn2 = cf.createConnection();
- conn2.start();
- Session sessReceiver = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer cons = sessReceiver.createConsumer(queue1);
- Message m2 = cons.receive(ActiveMQServerTestCase.MIN_TIMEOUT);
- ProxyAssertSupport.assertNull(m2);
-
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- if (conn2 != null)
- {
- conn2.close();
- }
- }
- }
-
- @Test
- public void test2PCReceiveCommit1PCOptimization() throws Exception
- {
- // Since both resources have some RM, TM will probably use 1PC optimization
-
- XAConnection conn = null;
- Connection conn2 = null;
-
- try
- {
- conn2 = cf.createConnection();
- conn2.start();
- Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer prod = sessProducer.createProducer(queue1);
- Message m = sessProducer.createTextMessage("XATest1");
- prod.send(m);
- m = sessProducer.createTextMessage("XATest2");
- prod.send(m);
-
- conn = xacf.createXAConnection();
- conn.start();
-
- tm.begin();
-
- XASession sess = conn.createXASession();
- XAResource res = sess.getXAResource();
-
- XAResource res2 = new DummyXAResource();
-
- Transaction tx = tm.getTransaction();
- tx.enlistResource(res);
- tx.enlistResource(res2);
-
- MessageConsumer cons = sess.createConsumer(queue1);
-
- TextMessage m2 = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(m2);
- ProxyAssertSupport.assertEquals("XATest1", m2.getText());
-
- m2 = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(m2);
- ProxyAssertSupport.assertEquals("XATest2", m2.getText());
-
- tx.delistResource(res, XAResource.TMSUCCESS);
- tx.delistResource(res2, XAResource.TMSUCCESS);
-
- tm.commit();
-
- // New tx
- tm.begin();
- tx = tm.getTransaction();
- tx.enlistResource(res);
- tx.enlistResource(res2);
-
- Message m3 = cons.receive(ActiveMQServerTestCase.MIN_TIMEOUT);
-
- ProxyAssertSupport.assertNull(m3);
-
- tx.delistResource(res, XAResource.TMSUCCESS);
- tx.delistResource(res2, XAResource.TMSUCCESS);
-
- tm.commit();
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- if (conn2 != null)
- {
- conn2.close();
- }
- }
-
- }
-
- @Test
- public void test2PCReceiveCommit() throws Exception
- {
- XAConnection conn = null;
- Connection conn2 = null;
-
- try
- {
- conn2 = cf.createConnection();
- conn2.start();
- Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer prod = sessProducer.createProducer(queue1);
- Message m = sessProducer.createTextMessage("XATest1");
- prod.send(m);
- m = sessProducer.createTextMessage("XATest2");
- prod.send(m);
-
- conn = xacf.createXAConnection();
- conn.start();
-
- tm.begin();
-
- XASession sess = conn.createXASession();
- XAResource res = sess.getXAResource();
- // res.setForceNotSameRM(true);
-
- XAResource res2 = new DummyXAResource();
-
- Transaction tx = tm.getTransaction();
- tx.enlistResource(res);
- tx.enlistResource(res2);
-
- MessageConsumer cons = sess.createConsumer(queue1);
-
- TextMessage m2 = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(m2);
- ProxyAssertSupport.assertEquals("XATest1", m2.getText());
-
- m2 = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(m2);
- ProxyAssertSupport.assertEquals("XATest2", m2.getText());
-
- tx.delistResource(res, XAResource.TMSUCCESS);
- tx.delistResource(res2, XAResource.TMSUCCESS);
-
- tm.commit();
-
- // New tx
- tm.begin();
- tx = tm.getTransaction();
- tx.enlistResource(res);
- tx.enlistResource(res2);
-
- Message m3 = cons.receive(ActiveMQServerTestCase.MIN_TIMEOUT);
-
- ProxyAssertSupport.assertNull(m3);
-
- tx.delistResource(res, XAResource.TMSUCCESS);
- tx.delistResource(res2, XAResource.TMSUCCESS);
-
- tm.commit();
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- if (conn2 != null)
- {
- conn2.close();
- }
- }
-
- }
-
- @Test
- public void test2PCReceiveRollback1PCOptimization() throws Exception
- {
- // Since both resources have some RM, TM will probably use 1PC optimization
-
- XAConnection conn = null;
- Connection conn2 = null;
-
- try
- {
- conn2 = cf.createConnection();
- Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer prod = sessProducer.createProducer(queue1);
- Message m = sessProducer.createTextMessage("XATest1");
- prod.send(m);
-
- m = sessProducer.createTextMessage("XATest2");
- prod.send(m);
-
- conn = xacf.createXAConnection();
- conn.start();
-
- tm.begin();
-
- XASession sess = conn.createXASession();
- XAResource res = sess.getXAResource();
-
- XAResource res2 = new DummyXAResource();
-
- Transaction tx = tm.getTransaction();
- tx.enlistResource(res);
- tx.enlistResource(res2);
-
- MessageConsumer cons = sess.createConsumer(queue1);
-
- TextMessage m2 = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
- ProxyAssertSupport.assertNotNull(m2);
- ProxyAssertSupport.assertEquals("XATest1", m2.getText());
- m2 = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
- ProxyAssertSupport.assertNotNull(m2);
- ProxyAssertSupport.assertEquals("XATest2", m2.getText());
-
- tx.delistResource(res, XAResource.TMSUCCESS);
- tx.delistResource(res2, XAResource.TMSUCCESS);
-
- tm.rollback();
-
- // Message should be redelivered
-
- // New tx
- tm.begin();
- tx = tm.getTransaction();
- tx.enlistResource(res);
- tx.enlistResource(res2);
-
- TextMessage m3 = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
- ProxyAssertSupport.assertNotNull(m3);
- ProxyAssertSupport.assertEquals("XATest1", m3.getText());
- m3 = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
- ProxyAssertSupport.assertNotNull(m3);
- ProxyAssertSupport.assertEquals("XATest2", m3.getText());
-
- ProxyAssertSupport.assertTrue(m3.getJMSRedelivered());
-
- tx.delistResource(res, XAResource.TMSUCCESS);
- tx.delistResource(res2, XAResource.TMSUCCESS);
-
- tm.commit();
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- if (conn2 != null)
- {
- conn2.close();
- }
- }
- }
-
- @Test
- public void test2PCReceiveRollback() throws Exception
- {
- XAConnection conn = null;
- Connection conn2 = null;
-
- try
- {
- conn2 = cf.createConnection();
- Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer prod = sessProducer.createProducer(queue1);
- Message m = sessProducer.createTextMessage("XATest1");
- prod.send(m);
-
- m = sessProducer.createTextMessage("XATest2");
- prod.send(m);
-
- conn = xacf.createXAConnection();
- conn.start();
-
- tm.begin();
-
- XASession sess = conn.createXASession();
- XAResource res = sess.getXAResource();
- // res.setForceNotSameRM(true);
-
- XAResource res2 = new DummyXAResource();
-
- Transaction tx = tm.getTransaction();
- tx.enlistResource(res);
- tx.enlistResource(res2);
-
- MessageConsumer cons = sess.createConsumer(queue1);
-
- TextMessage m2 = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
- ProxyAssertSupport.assertNotNull(m2);
- ProxyAssertSupport.assertEquals("XATest1", m2.getText());
- m2 = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
- ProxyAssertSupport.assertNotNull(m2);
- ProxyAssertSupport.assertEquals("XATest2", m2.getText());
-
- tx.delistResource(res, XAResource.TMSUCCESS);
- tx.delistResource(res2, XAResource.TMSUCCESS);
-
- tm.rollback();
-
- // Message should be redelivered
-
- // New tx
- tm.begin();
- tx = tm.getTransaction();
- tx.enlistResource(res);
- tx.enlistResource(res2);
-
- TextMessage m3 = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
- ProxyAssertSupport.assertNotNull(m3);
- ProxyAssertSupport.assertEquals("XATest1", m3.getText());
- m3 = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
- ProxyAssertSupport.assertNotNull(m3);
- ProxyAssertSupport.assertEquals("XATest2", m3.getText());
-
- ProxyAssertSupport.assertTrue(m3.getJMSRedelivered());
-
- tx.delistResource(res, XAResource.TMSUCCESS);
- tx.delistResource(res2, XAResource.TMSUCCESS);
-
- tm.commit();
-
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- if (conn2 != null)
- {
- conn2.close();
- }
- }
-
- }
-
- @Test
- public void test1PCSendCommit() throws Exception
- {
- XAConnection conn = null;
- Connection conn2 = null;
-
- try
- {
- conn = xacf.createXAConnection();
-
- tm.begin();
-
- XASession sess = conn.createXASession();
- XAResource res = sess.getXAResource();
-
- Transaction tx = tm.getTransaction();
- tx.enlistResource(res);
-
- MessageProducer prod = sess.createProducer(queue1);
- prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- Message m = sess.createTextMessage("XATest1");
- prod.send(queue1, m);
- m = sess.createTextMessage("XATest2");
- prod.send(queue1, m);
-
- tx.delistResource(res, XAResource.TMSUCCESS);
-
- tm.commit();
-
- conn2 = cf.createConnection();
- conn2.start();
- Session sessReceiver = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer cons = sessReceiver.createConsumer(queue1);
- TextMessage m2 = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
- ProxyAssertSupport.assertNotNull(m2);
- ProxyAssertSupport.assertEquals("XATest1", m2.getText());
- m2 = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
- ProxyAssertSupport.assertNotNull(m2);
- ProxyAssertSupport.assertEquals("XATest2", m2.getText());
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- if (conn2 != null)
- {
- conn2.close();
- }
- }
-
- }
-
- @Test
- public void test1PCSendRollback() throws Exception
- {
- XAConnection conn = null;
- Connection conn2 = null;
- try
- {
- conn = xacf.createXAConnection();
-
- tm.begin();
-
- XASession sess = conn.createXASession();
- XAResource res = sess.getXAResource();
-
- Transaction tx = tm.getTransaction();
- tx.enlistResource(res);
-
- MessageProducer prod = sess.createProducer(queue1);
- prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- Message m = sess.createTextMessage("XATest1");
- prod.send(queue1, m);
- m = sess.createTextMessage("XATest2");
- prod.send(queue1, m);
-
- tx.delistResource(res, XAResource.TMSUCCESS);
-
- tm.rollback();
-
- conn2 = cf.createConnection();
- conn2.start();
- Session sessReceiver = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer cons = sessReceiver.createConsumer(queue1);
- Message m2 = cons.receive(ActiveMQServerTestCase.MIN_TIMEOUT);
- ProxyAssertSupport.assertNull(m2);
-
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- if (conn2 != null)
- {
- conn2.close();
- }
- }
- }
-
- @Test
- public void test1PCReceiveCommit() throws Exception
- {
- XAConnection conn = null;
- Connection conn2 = null;
-
- try
- {
- conn2 = cf.createConnection();
- conn2.start();
- Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer prod = sessProducer.createProducer(queue1);
- Message m = sessProducer.createTextMessage("XATest1");
- prod.send(m);
- m = sessProducer.createTextMessage("XATest2");
- prod.send(m);
-
- conn = xacf.createXAConnection();
- conn.start();
-
- tm.begin();
-
- XASession sess = conn.createXASession();
- XAResource res = sess.getXAResource();
-
- Transaction tx = tm.getTransaction();
- tx.enlistResource(res);
-
- MessageConsumer cons = sess.createConsumer(queue1);
-
- TextMessage m2 = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(m2);
- ProxyAssertSupport.assertEquals("XATest1", m2.getText());
- m2 = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(m2);
- ProxyAssertSupport.assertEquals("XATest2", m2.getText());
-
- tx.delistResource(res, XAResource.TMSUCCESS);
-
- tm.commit();
-
- // New tx
- tm.begin();
- tx = tm.getTransaction();
- tx.enlistResource(res);
-
- Message m3 = cons.receive(ActiveMQServerTestCase.MIN_TIMEOUT);
-
- ProxyAssertSupport.assertNull(m3);
-
- tx.delistResource(res, XAResource.TMSUCCESS);
-
- tm.commit();
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- if (conn2 != null)
- {
- conn2.close();
- }
- }
-
- }
-
- @Test
- public void test1PCReceiveRollback() throws Exception
- {
- XAConnection conn = null;
- Connection conn2 = null;
-
- try
- {
- conn2 = cf.createConnection();
- Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer prod = sessProducer.createProducer(queue1);
- Message m = sessProducer.createTextMessage("XATest1");
- prod.send(m);
- m = sessProducer.createTextMessage("XATest2");
- prod.send(m);
-
- conn = xacf.createXAConnection();
- conn.start();
-
- tm.begin();
-
- XASession sess = conn.createXASession();
- XAResource res = sess.getXAResource();
-
- Transaction tx = tm.getTransaction();
- tx.enlistResource(res);
-
- MessageConsumer cons = sess.createConsumer(queue1);
-
- TextMessage m2 = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(m2);
- ProxyAssertSupport.assertEquals("XATest1", m2.getText());
-
- m2 = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(m2);
- ProxyAssertSupport.assertEquals("XATest2", m2.getText());
-
- tx.delistResource(res, XAResource.TMSUCCESS);
-
- tm.rollback();
-
- // Message should be redelivered
-
- // New tx
- tm.begin();
- tx = tm.getTransaction();
- tx.enlistResource(res);
-
- TextMessage m3 = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(m3);
- ProxyAssertSupport.assertEquals("XATest1", m3.getText());
-
- m3 = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(m3);
- ProxyAssertSupport.assertEquals("XATest2", m3.getText());
-
- ProxyAssertSupport.assertTrue(m3.getJMSRedelivered());
-
- tx.delistResource(res, XAResource.TMSUCCESS);
-
- tm.commit();
-
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- if (conn2 != null)
- {
- conn2.close();
- }
- }
-
- }
-
- @Test
- public void testMultipleSessionsOneTxCommitAcknowledge1PCOptimization() throws Exception
- {
- XAConnection conn = null;
- Connection conn2 = null;
-
- // Since both resources have some RM, TM will probably use 1PC optimization
-
- try
- {
- // First send 2 messages
- conn2 = cf.createConnection();
- Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer prod = sessProducer.createProducer(queue1);
- Message m = sessProducer.createTextMessage("jellyfish1");
- prod.send(m);
- m = sessProducer.createTextMessage("jellyfish2");
- prod.send(m);
-
- conn = xacf.createXAConnection();
- conn.start();
-
- tm.begin();
-
- // Create 2 sessions and enlist them
- XASession sess1 = conn.createXASession();
- XAResource res1 = sess1.getXAResource();
- XASession sess2 = conn.createXASession();
- XAResource res2 = sess2.getXAResource();
-
- Transaction tx = tm.getTransaction();
- tx.enlistResource(res1);
- tx.enlistResource(res2);
-
- // Receive the messages, one on each consumer
- MessageConsumer cons1 = sess1.createConsumer(queue1);
- TextMessage r1 = (TextMessage)cons1.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(r1);
- ProxyAssertSupport.assertEquals("jellyfish1", r1.getText());
-
- cons1.close();
-
- MessageConsumer cons2 = sess2.createConsumer(queue1);
- TextMessage r2 = (TextMessage)cons2.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(r2);
- ProxyAssertSupport.assertEquals("jellyfish2", r2.getText());
-
- tx.delistResource(res1, XAResource.TMSUCCESS);
- tx.delistResource(res2, XAResource.TMSUCCESS);
-
- // commit
- tm.commit();
-
- Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer cons = sess.createConsumer(queue1);
- conn2.start();
-
- TextMessage r3 = (TextMessage)cons.receive(ActiveMQServerTestCase.MIN_TIMEOUT);
- ProxyAssertSupport.assertNull(r3);
-
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- if (conn2 != null)
- {
- conn2.close();
- }
- }
-
- }
-
- @Test
- public void testMultipleSessionsOneTxCommitAcknowledge() throws Exception
- {
- XAConnection conn = null;
- Connection conn2 = null;
-
- try
- {
- // First send 2 messages
- conn2 = cf.createConnection();
- Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer prod = sessProducer.createProducer(queue1);
- Message m = sessProducer.createTextMessage("jellyfish1");
- prod.send(m);
- m = sessProducer.createTextMessage("jellyfish2");
- prod.send(m);
-
- conn = xacf.createXAConnection();
- conn.start();
-
- tm.begin();
-
- // Create 2 sessions and enlist them
- XASession sess1 = conn.createXASession();
- ClientSessionInternal res1 = (ClientSessionInternal)sess1.getXAResource();
- XASession sess2 = conn.createXASession();
- ClientSessionInternal res2 = (ClientSessionInternal)sess2.getXAResource();
- res1.setForceNotSameRM(true);
- res2.setForceNotSameRM(true);
-
- Transaction tx = tm.getTransaction();
- tx.enlistResource(res1);
- tx.enlistResource(res2);
-
- // Receive the messages, one on each consumer
- MessageConsumer cons1 = sess1.createConsumer(queue1);
- TextMessage r1 = (TextMessage)cons1.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(r1);
- ProxyAssertSupport.assertEquals("jellyfish1", r1.getText());
-
- cons1.close();
-
- MessageConsumer cons2 = sess2.createConsumer(queue1);
- TextMessage r2 = (TextMessage)cons2.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(r2);
- ProxyAssertSupport.assertEquals("jellyfish2", r2.getText());
-
- tx.delistResource(res1, XAResource.TMSUCCESS);
- tx.delistResource(res2, XAResource.TMSUCCESS);
-
- // commit
- tm.commit();
-
- Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer cons = sess.createConsumer(queue1);
- conn2.start();
-
- TextMessage r3 = (TextMessage)cons.receive(ActiveMQServerTestCase.MIN_TIMEOUT);
- ProxyAssertSupport.assertNull(r3);
-
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- if (conn2 != null)
- {
- conn2.close();
- }
- }
-
- }
-
- @Test
- public void testMultipleSessionsOneTxRollbackAcknowledge1PCOptimization() throws Exception
- {
- XAConnection conn = null;
- Connection conn2 = null;
-
- // Since both resources have some RM, TM will probably use 1PC optimization
-
- try
- {
- // First send 2 messages
- conn2 = cf.createConnection();
- Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer prod = sessProducer.createProducer(queue1);
- Message m = sessProducer.createTextMessage("jellyfish1");
- prod.send(m);
- m = sessProducer.createTextMessage("jellyfish2");
- prod.send(m);
- m = sessProducer.createTextMessage("jellyfish3");
- prod.send(m);
- m = sessProducer.createTextMessage("jellyfish4");
- prod.send(m);
-
- conn = xacf.createXAConnection();
- conn.start();
-
- tm.begin();
-
- // Create 2 sessions and enlist them
- XASession sess1 = conn.createXASession();
- ClientSessionInternal res1 = (ClientSessionInternal)sess1.getXAResource();
- XASession sess2 = conn.createXASession();
- ClientSessionInternal res2 = (ClientSessionInternal)sess2.getXAResource();
-
- Transaction tx = tm.getTransaction();
- tx.enlistResource(res1);
- tx.enlistResource(res2);
-
- // Receive the messages, two on each consumer
- MessageConsumer cons1 = sess1.createConsumer(queue1);
- TextMessage r1 = (TextMessage)cons1.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(r1);
- ProxyAssertSupport.assertEquals("jellyfish1", r1.getText());
-
- r1 = (TextMessage)cons1.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(r1);
- ProxyAssertSupport.assertEquals("jellyfish2", r1.getText());
-
- cons1.close();
-
- MessageConsumer cons2 = sess2.createConsumer(queue1);
- TextMessage r2 = (TextMessage)cons2.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(r2);
- ProxyAssertSupport.assertEquals("jellyfish3", r2.getText());
-
- r2 = (TextMessage)cons2.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(r2);
- ProxyAssertSupport.assertEquals("jellyfish4", r2.getText());
-
- cons2.close();
-
- // rollback
-
- tx.delistResource(res1, XAResource.TMSUCCESS);
- tx.delistResource(res2, XAResource.TMSUCCESS);
-
- tm.rollback();
-
- // Rollback causes cancel which is asynch
- Thread.sleep(1000);
-
- // We cannot assume anything about the order in which the transaction manager rollsback
- // the sessions - this is implementation dependent
-
- Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer cons = sess.createConsumer(queue1);
- conn2.start();
-
- TextMessage r = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
- ProxyAssertSupport.assertNotNull(r);
-
- boolean session1First = false;
-
- if (r.getText().equals("jellyfish1"))
- {
- session1First = true;
- }
- else if (r.getText().equals("jellyfish3"))
- {
- session1First = false;
- }
- else
- {
- ProxyAssertSupport.fail("Unexpected message");
- }
-
- if (session1First)
- {
- r = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(r);
-
- ProxyAssertSupport.assertEquals("jellyfish2", r.getText());
-
- r = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(r);
-
- ProxyAssertSupport.assertEquals("jellyfish3", r.getText());
-
- r = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(r);
-
- ProxyAssertSupport.assertEquals("jellyfish4", r.getText());
-
- }
- else
- {
- r = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(r);
-
- ProxyAssertSupport.assertEquals("jellyfish4", r.getText());
-
- r = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(r);
-
- ProxyAssertSupport.assertEquals("jellyfish1", r.getText());
-
- r = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(r);
-
- ProxyAssertSupport.assertEquals("jellyfish2", r.getText());
- }
-
- r = (TextMessage)cons.receive(ActiveMQServerTestCase.MIN_TIMEOUT);
-
- ProxyAssertSupport.assertNull(r);
-
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- if (conn2 != null)
- {
- conn2.close();
- }
- }
-
- }
-
- @Test
- public void testMultipleSessionsOneTxRollbackAcknowledge() throws Exception
- {
- XAConnection conn = null;
- Connection conn2 = null;
-
- try
- {
- // First send 2 messages
- conn2 = cf.createConnection();
- Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer prod = sessProducer.createProducer(queue1);
- Message m = sessProducer.createTextMessage("jellyfish1");
- prod.send(m);
- m = sessProducer.createTextMessage("jellyfish2");
- prod.send(m);
- m = sessProducer.createTextMessage("jellyfish3");
- prod.send(m);
- m = sessProducer.createTextMessage("jellyfish4");
- prod.send(m);
-
- conn = xacf.createXAConnection();
- conn.start();
-
- tm.begin();
-
- // Create 2 sessions and enlist them
- XASession sess1 = conn.createXASession();
- ClientSessionInternal res1 = (ClientSessionInternal)sess1.getXAResource();
- XASession sess2 = conn.createXASession();
- ClientSessionInternal res2 = (ClientSessionInternal)sess2.getXAResource();
- res1.setForceNotSameRM(true);
- res2.setForceNotSameRM(true);
-
- Transaction tx = tm.getTransaction();
- tx.enlistResource(res1);
- tx.enlistResource(res2);
-
- // Receive the messages, two on each consumer
- MessageConsumer cons1 = sess1.createConsumer(queue1);
- TextMessage r1 = (TextMessage)cons1.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(r1);
- ProxyAssertSupport.assertEquals("jellyfish1", r1.getText());
-
- r1 = (TextMessage)cons1.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(r1);
- ProxyAssertSupport.assertEquals("jellyfish2", r1.getText());
-
- cons1.close();
-
- // Cancel is asynch
- Thread.sleep(500);
-
- MessageConsumer cons2 = sess2.createConsumer(queue1);
- TextMessage r2 = (TextMessage)cons2.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(r2);
- ProxyAssertSupport.assertEquals("jellyfish3", r2.getText());
-
- r2 = (TextMessage)cons2.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(r2);
- ProxyAssertSupport.assertEquals("jellyfish4", r2.getText());
-
- // rollback
-
- cons2.close();
-
- tx.delistResource(res1, XAResource.TMSUCCESS);
- tx.delistResource(res2, XAResource.TMSUCCESS);
-
- tm.rollback();
-
- // Rollback causes cancel which is asynch
- Thread.sleep(1000);
-
- // We cannot assume anything about the order in which the transaction manager rollsback
- // the sessions - this is implementation dependent
-
- Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer cons = sess.createConsumer(queue1);
- conn2.start();
-
- TextMessage r = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
- ProxyAssertSupport.assertNotNull(r);
-
- boolean session1First = false;
-
- if (r.getText().equals("jellyfish1"))
- {
- session1First = true;
- }
- else if (r.getText().equals("jellyfish3"))
- {
- session1First = false;
- }
- else
- {
- ProxyAssertSupport.fail("Unexpected message");
- }
-
- if (session1First)
- {
- r = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(r);
-
- ProxyAssertSupport.assertEquals("jellyfish2", r.getText());
-
- r = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(r);
-
- ProxyAssertSupport.assertEquals("jellyfish3", r.getText());
-
- r = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(r);
-
- ProxyAssertSupport.assertEquals("jellyfish4", r.getText());
-
- }
- else
- {
- r = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(r);
-
- ProxyAssertSupport.assertEquals("jellyfish4", r.getText());
-
- r = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(r);
-
- ProxyAssertSupport.assertEquals("jellyfish1", r.getText());
-
- r = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(r);
-
- ProxyAssertSupport.assertEquals("jellyfish2", r.getText());
- }
-
- r = (TextMessage)cons.receive(ActiveMQServerTestCase.MIN_TIMEOUT);
-
- ProxyAssertSupport.assertNull(r);
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- if (conn2 != null)
- {
- conn2.close();
- }
- }
- }
-
- @Test
- public void testMultipleSessionsOneTxRollbackAcknowledgeForceFailureInCommit() throws Exception
- {
- XAConnection conn = null;
- Connection conn2 = null;
-
- try
- {
- // First send 4 messages
- conn2 = cf.createConnection();
- Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer prod = sessProducer.createProducer(queue1);
-
- Message m = sessProducer.createTextMessage("jellyfish1");
- prod.send(m);
- m = sessProducer.createTextMessage("jellyfish2");
- prod.send(m);
- m = sessProducer.createTextMessage("jellyfish3");
- prod.send(m);
- m = sessProducer.createTextMessage("jellyfish4");
- prod.send(m);
-
- conn = xacf.createXAConnection();
- conn.start();
-
- tm.begin();
-
- XASession sess1 = conn.createXASession();
- XAResource res1 = sess1.getXAResource();
- DummyXAResource res2 = new DummyXAResource(true);
-
- Transaction tx = tm.getTransaction();
- tx.enlistResource(res1);
- tx.enlistResource(res2);
-
- MessageConsumer cons1 = sess1.createConsumer(queue1);
- TextMessage r1 = (TextMessage)cons1.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(r1);
- ProxyAssertSupport.assertEquals("jellyfish1", r1.getText());
-
- r1 = (TextMessage)cons1.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(r1);
- ProxyAssertSupport.assertEquals("jellyfish2", r1.getText());
-
- r1 = (TextMessage)cons1.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(r1);
- ProxyAssertSupport.assertEquals("jellyfish3", r1.getText());
-
- r1 = (TextMessage)cons1.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(r1);
- ProxyAssertSupport.assertEquals("jellyfish4", r1.getText());
-
- r1 = (TextMessage)cons1.receive(1000);
-
- ProxyAssertSupport.assertNull(r1);
-
- cons1.close();
-
- // try and commit - and we're going to make the dummyxaresource throw an exception on commit,
- // which should cause rollback to be called on the other resource
-
- tx.delistResource(res1, XAResource.TMSUCCESS);
- tx.delistResource(res2, XAResource.TMSUCCESS);
-
- // rollback will cause an attempt to deliver messages locally to the original consumers.
- // the original consumer has closed, so it will cancelled to the server
- // the server cancel is asynch, so we need to sleep for a bit to make sure it completes
- log.trace("Forcing failure");
- try
- {
- tm.commit();
- ProxyAssertSupport.fail("should not get here");
- }
- catch (Exception e)
- {
- // We should expect this
- }
-
- Thread.sleep(1000);
-
- Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer cons = sess.createConsumer(queue1);
- conn2.start();
-
- TextMessage r = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(r);
-
- ProxyAssertSupport.assertEquals("jellyfish1", r.getText());
-
- r = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(r);
-
- ProxyAssertSupport.assertEquals("jellyfish2", r.getText());
-
- r = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(r);
-
- ProxyAssertSupport.assertEquals("jellyfish3", r.getText());
-
- r = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(r);
-
- ProxyAssertSupport.assertEquals("jellyfish4", r.getText());
-
- r = (TextMessage)cons.receive(ActiveMQServerTestCase.MIN_TIMEOUT);
-
- ProxyAssertSupport.assertNull(r);
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- if (conn2 != null)
- {
- conn2.close();
- }
- }
-
- }
-
- @Test
- public void testMultipleSessionsOneTxCommitSend1PCOptimization() throws Exception
- {
- // Since both resources have some RM, TM will probably use 1PC optimization
-
- XAConnection conn = null;
-
- Connection conn2 = null;
-
- try
- {
- conn = xacf.createXAConnection();
- conn.start();
-
- tm.begin();
-
- // Create 2 sessions and enlist them
- XASession sess1 = conn.createXASession();
- XAResource res1 = sess1.getXAResource();
- XASession sess2 = conn.createXASession();
- XAResource res2 = sess2.getXAResource();
-
- Transaction tx = tm.getTransaction();
- tx.enlistResource(res1);
- tx.enlistResource(res2);
-
- // Send 2 messages - one from each session
-
- MessageProducer prod1 = sess1.createProducer(queue1);
- MessageProducer prod2 = sess2.createProducer(queue1);
-
- prod1.send(sess1.createTextMessage("echidna1"));
- prod2.send(sess2.createTextMessage("echidna2"));
-
- tx.delistResource(res1, XAResource.TMSUCCESS);
- tx.delistResource(res2, XAResource.TMSUCCESS);
-
- // commit
- tm.commit();
-
- // Messages should be in queue
-
- conn2 = cf.createConnection();
- Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer cons = sess.createConsumer(queue1);
- conn2.start();
-
- TextMessage r1 = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
- ProxyAssertSupport.assertNotNull(r1);
- ProxyAssertSupport.assertEquals("echidna1", r1.getText());
-
- TextMessage r2 = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
- ProxyAssertSupport.assertNotNull(r2);
- ProxyAssertSupport.assertEquals("echidna2", r2.getText());
-
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- if (conn2 != null)
- {
- conn2.close();
- }
- }
- }
-
- @Test
- public void testMultipleSessionsOneTxCommitSend() throws Exception
- {
- // Since both resources have some RM, TM will probably use 1PC optimization
-
- XAConnection conn = null;
-
- Connection conn2 = null;
-
- try
- {
-
- conn = xacf.createXAConnection();
- conn.start();
-
- tm.begin();
-
- // Create 2 sessions and enlist them
- XASession sess1 = conn.createXASession();
- ClientSessionInternal res1 = (ClientSessionInternal)sess1.getXAResource();
- XASession sess2 = conn.createXASession();
- ClientSessionInternal res2 = (ClientSessionInternal)sess2.getXAResource();
- res1.setForceNotSameRM(true);
- res2.setForceNotSameRM(true);
-
- Transaction tx = tm.getTransaction();
- tx.enlistResource(res1);
- tx.enlistResource(res2);
-
- // Send 2 messages - one from each session
-
- MessageProducer prod1 = sess1.createProducer(queue1);
- MessageProducer prod2 = sess2.createProducer(queue1);
-
- prod1.send(sess1.createTextMessage("echidna1"));
- prod2.send(sess2.createTextMessage("echidna2"));
-
- tx.delistResource(res1, XAResource.TMSUCCESS);
- tx.delistResource(res2, XAResource.TMSUCCESS);
-
- // commit
- tm.commit();
-
- // Messages should be in queue
-
- conn2 = cf.createConnection();
- Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer cons = sess.createConsumer(queue1);
- conn2.start();
-
- TextMessage r1 = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
- ProxyAssertSupport.assertNotNull(r1);
- ProxyAssertSupport.assertEquals("echidna1", r1.getText());
-
- TextMessage r2 = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
- ProxyAssertSupport.assertNotNull(r2);
- ProxyAssertSupport.assertEquals("echidna2", r2.getText());
-
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- if (conn2 != null)
- {
- conn2.close();
- }
-
- }
-
- }
-
- @Test
- public void testMultipleSessionsOneTxRollbackSend1PCOptimization() throws Exception
- {
- // Since both resources have some RM, TM will probably use 1PC optimization
-
- XAConnection conn = null;
-
- Connection conn2 = null;
-
- try
- {
- conn = xacf.createXAConnection();
- conn.start();
-
- tm.begin();
-
- // Create 2 sessions and enlist them
- XASession sess1 = conn.createXASession();
- XAResource res1 = sess1.getXAResource();
- XASession sess2 = conn.createXASession();
- XAResource res2 = sess2.getXAResource();
-
- Transaction tx = tm.getTransaction();
- tx.enlistResource(res1);
- tx.enlistResource(res2);
-
- // Send 2 messages - one from each session
-
- MessageProducer prod1 = sess1.createProducer(queue1);
- MessageProducer prod2 = sess2.createProducer(queue1);
-
- prod1.send(sess1.createTextMessage("echidna1"));
- prod2.send(sess2.createTextMessage("echidna2"));
-
- tx.delistResource(res1, XAResource.TMSUCCESS);
- tx.delistResource(res2, XAResource.TMSUCCESS);
-
- // rollback
- tm.rollback();
-
- // Messages should not be in queue
-
- conn2 = cf.createConnection();
- Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer cons = sess.createConsumer(queue1);
- conn2.start();
-
- TextMessage r1 = (TextMessage)cons.receive(ActiveMQServerTestCase.MIN_TIMEOUT);
- ProxyAssertSupport.assertNull(r1);
-
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- if (conn2 != null)
- {
- conn2.close();
- }
- }
- }
-
- @Test
- public void testMultipleSessionsOneTxRollbackSend() throws Exception
- {
- XAConnection conn = null;
-
- Connection conn2 = null;
-
- try
- {
-
- conn = xacf.createXAConnection();
- conn.start();
-
- tm.begin();
-
- // Create 2 sessions and enlist them
- XASession sess1 = conn.createXASession();
- ClientSessionInternal res1 = (ClientSessionInternal)sess1.getXAResource();
- XASession sess2 = conn.createXASession();
- ClientSessionInternal res2 = (ClientSessionInternal)sess2.getXAResource();
- res1.setForceNotSameRM(true);
- res2.setForceNotSameRM(true);
-
- Transaction tx = tm.getTransaction();
- tx.enlistResource(res1);
- tx.enlistResource(res2);
-
- // Send 2 messages - one from each session
-
- MessageProducer prod1 = sess1.createProducer(queue1);
- MessageProducer prod2 = sess2.createProducer(queue1);
-
- prod1.send(sess1.createTextMessage("echidna1"));
- prod2.send(sess2.createTextMessage("echidna2"));
-
- tx.delistResource(res1, XAResource.TMSUCCESS);
- tx.delistResource(res2, XAResource.TMSUCCESS);
-
- // rollback
- tm.rollback();
-
- // Messages should not be in queue
-
- conn2 = cf.createConnection();
- Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer cons = sess.createConsumer(queue1);
- conn2.start();
-
- TextMessage r1 = (TextMessage)cons.receive(ActiveMQServerTestCase.MIN_TIMEOUT);
- ProxyAssertSupport.assertNull(r1);
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- if (conn2 != null)
- {
- conn2.close();
- }
- }
- }
-
- @Test
- public void testOneSessionTwoTransactionsCommitAcknowledge() throws Exception
- {
- XAConnection conn = null;
-
- Connection conn2 = null;
-
- try
- {
- // First send 2 messages
- conn2 = cf.createConnection();
- Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer prod = sessProducer.createProducer(queue1);
- Message m = sessProducer.createTextMessage("jellyfish1");
- prod.send(m);
- m = sessProducer.createTextMessage("jellyfish2");
- prod.send(m);
-
- conn = xacf.createXAConnection();
-
- // Create a session
- XASession sess1 = conn.createXASession();
- XAResource res1 = sess1.getXAResource();
-
- conn.start();
- MessageConsumer cons1 = sess1.createConsumer(queue1);
-
- tm.begin();
-
- Transaction tx1 = tm.getTransaction();
- tx1.enlistResource(res1);
-
- // Receive one message in one tx
-
- TextMessage r1 = (TextMessage)cons1.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
- ProxyAssertSupport.assertNotNull(r1);
- ProxyAssertSupport.assertEquals("jellyfish1", r1.getText());
-
- tx1.delistResource(res1, XAResource.TMSUCCESS);
-
- // suspend the tx
- Transaction suspended = tm.suspend();
-
- tm.begin();
-
- Transaction tx2 = tm.getTransaction();
- tx2.enlistResource(res1);
-
- // Receive 2nd message in a different tx
- TextMessage r2 = (TextMessage)cons1.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
- ProxyAssertSupport.assertNotNull(r2);
- ProxyAssertSupport.assertEquals("jellyfish2", r2.getText());
-
- tx2.delistResource(res1, XAResource.TMSUCCESS);
-
- // commit this transaction
- tm.commit();
-
- // verify that no messages are available
- conn2.close();
- conn2 = cf.createConnection();
- Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- conn2.start();
- MessageConsumer cons = sess.createConsumer(queue1);
- TextMessage r3 = (TextMessage)cons.receive(ActiveMQServerTestCase.MIN_TIMEOUT);
- ProxyAssertSupport.assertNull(r3);
-
- // now resume the first tx and then commit it
- tm.resume(suspended);
-
- tm.commit();
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- if (conn2 != null)
- {
- conn2.close();
- }
- }
- }
-
- @Test
- public void testOneSessionTwoTransactionsRollbackAcknowledge() throws Exception
- {
- XAConnection conn = null;
-
- Connection conn2 = null;
-
- try
- {
- // First send 2 messages
- conn2 = cf.createConnection();
- Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer prod = sessProducer.createProducer(queue1);
- Message m = sessProducer.createTextMessage("jellyfish1");
- prod.send(m);
- m = sessProducer.createTextMessage("jellyfish2");
- prod.send(m);
-
- conn = xacf.createXAConnection();
-
- // Create a session
- XASession sess1 = conn.createXASession();
- XAResource res1 = sess1.getXAResource();
-
- conn.start();
- MessageConsumer cons1 = sess1.createConsumer(queue1);
-
- tm.begin();
-
- Transaction tx1 = tm.getTransaction();
- tx1.enlistResource(res1);
-
- // Receive one message in one tx
-
- TextMessage r1 = (TextMessage)cons1.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
- ProxyAssertSupport.assertNotNull(r1);
- ProxyAssertSupport.assertEquals("jellyfish1", r1.getText());
-
- tx1.delistResource(res1, XAResource.TMSUCCESS);
-
- // suspend the tx
- Transaction suspended = tm.suspend();
-
- tm.begin();
-
- Transaction tx2 = tm.getTransaction();
- tx2.enlistResource(res1);
-
- // Receive 2nd message in a different tx
- TextMessage r2 = (TextMessage)cons1.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
- ProxyAssertSupport.assertNotNull(r2);
- ProxyAssertSupport.assertEquals("jellyfish2", r2.getText());
-
- cons1.close();
-
- tx2.delistResource(res1, XAResource.TMSUCCESS);
-
- // rollback this transaction
- tm.rollback();
-
- // verify that second message is available
- conn2.close();
- conn2 = cf.createConnection();
- Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- conn2.start();
- MessageConsumer cons = sess.createConsumer(queue1);
-
- TextMessage r3 = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
-
- ProxyAssertSupport.assertNotNull(r3);
- ProxyAssertSupport.assertEquals("jellyfish2", r3.getText());
- r3 = (TextMessage)cons.receive(ActiveMQServerTestCase.MIN_TIMEOUT);
- ProxyAssertSupport.assertNull(r3);
-
- // rollback the other tx
- tm.resume(suspended);
- tm.rollback();
-
- // Verify the first message is now available
- r3 = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
- ProxyAssertSupport.assertNotNull(r3);
- ProxyAssertSupport.assertEquals("jellyfish1", r3.getText());
- r3 = (TextMessage)cons.receive(ActiveMQServerTestCase.MIN_TIMEOUT);
- ProxyAssertSupport.assertNull(r3);
-
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- if (conn2 != null)
- {
- conn2.close();
- }
-
- }
-
- }
-
- @Test
- public void testOneSessionTwoTransactionsCommitSend() throws Exception
- {
- XAConnection conn = null;
-
- Connection conn2 = null;
-
- try
- {
- conn = xacf.createXAConnection();
-
- // Create a session
- XASession sess1 = conn.createXASession();
- XAResource res1 = sess1.getXAResource();
-
- MessageProducer prod1 = sess1.createProducer(queue1);
-
- tm.begin();
-
- Transaction tx1 = tm.getTransaction();
- tx1.enlistResource(res1);
-
- // Send a message
- prod1.send(sess1.createTextMessage("kangaroo1"));
-
- tx1.delistResource(res1, XAResource.TMSUCCESS);
-
- // suspend the tx
- Transaction suspended = tm.suspend();
-
- tm.begin();
-
- // Send another message in another tx using the same session
- Transaction tx2 = tm.getTransaction();
- tx2.enlistResource(res1);
-
- // Send a message
- prod1.send(sess1.createTextMessage("kangaroo2"));
-
- tx2.delistResource(res1, XAResource.TMSUCCESS);
-
- // commit this transaction
- tm.commit();
-
- // verify only kangaroo2 message is sent
- conn2 = cf.createConnection();
- Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- conn2.start();
- MessageConsumer cons = sess.createConsumer(queue1);
- TextMessage r1 = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
- ProxyAssertSupport.assertNotNull(r1);
- ProxyAssertSupport.assertEquals("kangaroo2", r1.getText());
- TextMessage r2 = (TextMessage)cons.receive(ActiveMQServerTestCase.MIN_TIMEOUT);
- ProxyAssertSupport.assertNull(r2);
-
- // now resume the first tx and then commit it
- tm.resume(suspended);
-
- tm.commit();
-
- // verify that the first text message is received
- TextMessage r3 = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
- ProxyAssertSupport.assertNotNull(r3);
- ProxyAssertSupport.assertEquals("kangaroo1", r3.getText());
-
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- if (conn2 != null)
- {
- conn2.close();
- }
-
- }
-
- }
-
- @Test
- public void testIsSamRM() throws Exception
- {
- XAConnection conn = null;
-
- conn = xacf.createXAConnection();
-
- // Create a session
- XASession sess1 = conn.createXASession();
- XAResource res1 = sess1.getXAResource();
-
- // Create a session
- XASession sess2 = conn.createXASession();
- XAResource res2 = sess2.getXAResource();
-
- ProxyAssertSupport.assertTrue(res1.isSameRM(res2));
- }
-
- @Test
- public void testOneSessionTwoTransactionsRollbackSend() throws Exception
- {
- XAConnection conn = null;
-
- Connection conn2 = null;
-
- try
- {
-
- conn = xacf.createXAConnection();
-
- // Create a session
- XASession sess1 = conn.createXASession();
- XAResource res1 = sess1.getXAResource();
-
- MessageProducer prod1 = sess1.createProducer(queue1);
-
- tm.begin();
-
- Transaction tx1 = tm.getTransaction();
- tx1.enlistResource(res1);
-
- // Send a message
- prod1.send(sess1.createTextMessage("kangaroo1"));
-
- // suspend the tx
- Transaction suspended = tm.suspend();
-
- tm.begin();
-
- // Send another message in another tx using the same session
- Transaction tx2 = tm.getTransaction();
- tx2.enlistResource(res1);
-
- // Send a message
- prod1.send(sess1.createTextMessage("kangaroo2"));
-
- tx2.delistResource(res1, XAResource.TMSUCCESS);
-
- // rollback this transaction
- tm.rollback();
-
- // verify no messages are sent
- conn2 = cf.createConnection();
- Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- conn2.start();
- MessageConsumer cons = sess.createConsumer(queue1);
- TextMessage r1 = (TextMessage)cons.receive(ActiveMQServerTestCase.MIN_TIMEOUT);
-
- ProxyAssertSupport.assertNull(r1);
-
- // now resume the first tx and then commit it
- tm.resume(suspended);
-
- tx1.delistResource(res1, XAResource.TMSUCCESS);
-
- tm.commit();
-
- // verify that the first text message is received
- TextMessage r3 = (TextMessage)cons.receive(ActiveMQServerTestCase.MAX_TIMEOUT);
- ProxyAssertSupport.assertNotNull(r3);
- ProxyAssertSupport.assertEquals("kangaroo1", r3.getText());
-
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- if (conn2 != null)
- {
- conn2.close();
- }
-
- }
-
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
- static class DummyListener implements MessageListener
- {
- protected JmsTestLogger log = JmsTestLogger.LOGGER;
-
- public List<Message> messages = new ArrayList<Message>();
-
- public void onMessage(final Message message)
- {
- messages.add(message);
- }
- }
-
- static class DummyXAResource implements XAResource
- {
- boolean failOnPrepare;
-
- DummyXAResource()
- {
- }
-
- DummyXAResource(final boolean failOnPrepare)
- {
- this.failOnPrepare = failOnPrepare;
- }
-
- public void commit(final Xid arg0, final boolean arg1) throws XAException
- {
- }
-
- public void end(final Xid arg0, final int arg1) throws XAException
- {
- }
-
- public void forget(final Xid arg0) throws XAException
- {
- }
-
- public int getTransactionTimeout() throws XAException
- {
- return 0;
- }
-
- public boolean isSameRM(final XAResource arg0) throws XAException
- {
- return false;
- }
-
- public int prepare(final Xid arg0) throws XAException
- {
- if (failOnPrepare)
- {
- throw new XAException(XAException.XAER_RMFAIL);
- }
- return XAResource.XA_OK;
- }
-
- public Xid[] recover(final int arg0) throws XAException
- {
- return null;
- }
-
- public void rollback(final Xid arg0) throws XAException
- {
- }
-
- public boolean setTransactionTimeout(final int arg0) throws XAException
- {
- return false;
- }
-
- public void start(final Xid arg0, final int arg1) throws XAException
- {
-
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/ConcurrentCloseStressTest.java
----------------------------------------------------------------------
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/ConcurrentCloseStressTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/ConcurrentCloseStressTest.java
deleted file mode 100644
index 30d001b..0000000
--- a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/ConcurrentCloseStressTest.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.jms.tests.stress;
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.naming.InitialContext;
-
-import org.apache.activemq.jms.tests.ActiveMQServerTestCase;
-import org.apache.activemq.jms.tests.JmsTestLogger;
-import org.apache.activemq.jms.tests.util.ProxyAssertSupport;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-/**
- * This test was added to test regression on http://jira.jboss.com/jira/browse/JBMESSAGING-660
- * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- */
-public class ConcurrentCloseStressTest extends ActiveMQServerTestCase
-{
- @BeforeClass
- public static void stressTestsEnabled()
- {
- org.junit.Assume.assumeTrue(JMSStressTestBase.STRESS_TESTS_ENABLED);
- }
-
- InitialContext ic;
-
- ConnectionFactory cf;
-
- Queue queue;
-
- @Override
- @Before
- public void setUp() throws Exception
- {
- super.setUp();
-
- // ServerManagement.start("all");
-
- ic = getInitialContext();
- cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
-
- destroyQueue("TestQueue");
- createQueue("TestQueue");
-
- queue = (Queue)ic.lookup("queue/TestQueue");
-
- log.debug("setup done");
- }
-
- @Override
- @After
- public void tearDown() throws Exception
- {
- destroyQueue("TestQueue");
- super.tearDown();
- }
-
- @Test
- public void testProducersAndConsumers() throws Exception
- {
- Connection connectionProducer = cf.createConnection();
- Connection connectionReader = cf.createConnection();
-
- connectionReader.start();
- connectionProducer.start(); // try with and without this...
-
- ProducerThread[] producerThread = new ProducerThread[20];
- ReaderThread[] readerThread = new ReaderThread[20];
- TestThread[] threads = new TestThread[40];
-
- for (int i = 0; i < 20; i++)
- {
- producerThread[i] = new ProducerThread(i, connectionProducer, queue);
- readerThread[i] = new ReaderThread(i, connectionReader, queue);
- threads[i] = producerThread[i];
- threads[i + 20] = readerThread[i];
- }
-
- for (int i = 0; i < 40; i++)
- {
- threads[i].start();
- }
-
- for (int i = 0; i < 40; i++)
- {
- threads[i].join();
- }
-
- boolean hasFailure = false;
-
- for (int i = 0; i < 40; i++)
- {
- if (!threads[i].exceptions.isEmpty())
- {
- hasFailure = true;
- for (Exception element : threads[i].exceptions)
- {
- Exception ex = element;
- log.error("Exception occurred in one of the threads - " + ex, ex);
- }
- }
- }
-
- int messagesProduced = 0;
- int messagesRead = 0;
- for (ProducerThread element : producerThread)
- {
- messagesProduced += element.messagesProduced;
- }
-
- for (int i = 0; i < producerThread.length; i++)
- {
- messagesRead += readerThread[i].messagesRead;
- }
-
- if (hasFailure)
- {
- ProxyAssertSupport.fail("An exception has occurred in one of the threads");
- }
- }
-
- static class TestThread extends Thread
- {
- List<Exception> exceptions = new ArrayList<Exception>();
-
- protected int index;
-
- public int messageCount = 0;
- }
-
- static class ReaderThread extends TestThread
- {
- private static final JmsTestLogger log = JmsTestLogger.LOGGER;
-
- Connection conn;
-
- Queue queue;
-
- int messagesRead = 0;
-
- public ReaderThread(final int index, final Connection conn, final Queue queue) throws Exception
- {
- this.index = index;
- this.conn = conn;
- this.queue = queue;
- }
-
- @Override
- public void run()
- {
- int commitCounter = 0;
- try
- {
- Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
- MessageConsumer consumer = session.createConsumer(queue);
-
- int lastCount = messageCount;
- while (true)
- {
- TextMessage message = (TextMessage)consumer.receive(5000);
- if (message == null)
- {
- break;
- }
- ReaderThread.log.debug("read message " + message.getText());
-
- // alternating commits and rollbacks
- if (commitCounter++ % 2 == 0)
- {
- messagesRead += messageCount - lastCount;
- lastCount = messageCount;
- ReaderThread.log.debug("commit");
- session.commit();
- }
- else
- {
- lastCount = messageCount;
- ReaderThread.log.debug("rollback");
- session.rollback();
- }
-
- messageCount++;
-
- if (messageCount % 7 == 0)
- {
- session.close();
-
- session = conn.createSession(true, Session.SESSION_TRANSACTED);
- consumer = session.createConsumer(queue);
- }
- }
-
- messagesRead += messageCount - lastCount;
-
- session.commit();
- consumer.close();
- session.close();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- exceptions.add(e);
- }
- }
-
- }
-
- static class ProducerThread extends TestThread
- {
- private static final JmsTestLogger log = JmsTestLogger.LOGGER;
-
- Connection conn;
-
- Queue queue;
-
- int messagesProduced = 0;
-
- public ProducerThread(final int index, final Connection conn, final Queue queue) throws Exception
- {
- this.index = index;
- this.conn = conn;
- this.queue = queue;
- }
-
- @Override
- public void run()
- {
- for (int i = 0; i < 10; i++)
- {
- try
- {
- int lastMessage = messageCount;
- Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
- MessageProducer producer = sess.createProducer(queue);
-
- for (int j = 0; j < 20; j++)
- {
- producer.send(sess.createTextMessage("Message " + i + ", " + j));
-
- if (j % 2 == 0)
- {
- ProducerThread.log.debug("commit");
- messagesProduced += messageCount - lastMessage;
- lastMessage = messageCount;
-
- sess.commit();
- }
- else
- {
- ProducerThread.log.debug("rollback");
- lastMessage = messageCount;
- sess.rollback();
- }
- messageCount++;
-
- }
-
- messagesProduced += messageCount - lastMessage;
- sess.commit();
- sess.close();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- exceptions.add(e);
- }
- }
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/CorruptMessageStressTest.java
----------------------------------------------------------------------
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/CorruptMessageStressTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/CorruptMessageStressTest.java
deleted file mode 100644
index e5fde58..0000000
--- a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/CorruptMessageStressTest.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.jms.tests.stress;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.naming.InitialContext;
-
-import org.apache.activemq.jms.tests.ActiveMQServerTestCase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-/**
- * A stress test written to investigate http://jira.jboss.org/jira/browse/JBMESSAGING-362
- *
- * @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
- */
-public class CorruptMessageStressTest extends ActiveMQServerTestCase
-{
- @BeforeClass
- public static void stressTestsEnabled()
- {
- org.junit.Assume.assumeTrue(JMSStressTestBase.STRESS_TESTS_ENABLED);
- }
-
- public static int PRODUCER_COUNT = 30;
-
- public static int MESSAGE_COUNT = 10000;
-
- // Static --------------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private InitialContext ic;
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- @Test
- public void testMultipleSenders() throws Exception
- {
- ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
- Queue queue = (Queue)ic.lookup("/queue/StressTestQueue");
- drainDestination(cf, queue);
-
- Connection conn = cf.createConnection();
-
- Session[] sessions = new Session[CorruptMessageStressTest.PRODUCER_COUNT];
- MessageProducer[] producers = new MessageProducer[CorruptMessageStressTest.PRODUCER_COUNT];
-
- for (int i = 0; i < CorruptMessageStressTest.PRODUCER_COUNT; i++)
- {
- sessions[i] = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- producers[i] = sessions[i].createProducer(queue);
- producers[i].setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- }
-
- Thread[] threads = new Thread[CorruptMessageStressTest.PRODUCER_COUNT];
-
- for (int i = 0; i < CorruptMessageStressTest.PRODUCER_COUNT; i++)
- {
- threads[i] = new Thread(new Sender(sessions[i], producers[i]), "Sender Thread #" + i);
- threads[i].start();
- }
-
- // wait for the threads to finish
-
- for (int i = 0; i < CorruptMessageStressTest.PRODUCER_COUNT; i++)
- {
- threads[i].join();
- }
-
- conn.close();
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- @Override
- @Before
- public void setUp() throws Exception
- {
- super.setUp();
-
- // ServerManagement.start("all");
- ic = getInitialContext();
- createQueue("StressTestQueue");
- }
-
- @Override
- @After
- public void tearDown() throws Exception
- {
- super.tearDown();
- destroyQueue("StressTestQueue");
- ic.close();
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
- private class Sender implements Runnable
- {
- private final Session session;
-
- private final MessageProducer producer;
-
- private int count = 0;
-
- public Sender(final Session session, final MessageProducer producer)
- {
- this.session = session;
- this.producer = producer;
- }
-
- public void run()
- {
- while (true)
- {
- if (count == CorruptMessageStressTest.MESSAGE_COUNT)
- {
- break;
- }
-
- try
- {
- Message m = session.createMessage();
- m.setStringProperty("XXX", "XXX-VALUE");
- m.setStringProperty("YYY", "YYY-VALUE");
- producer.send(m);
- count++;
- }
- catch (Exception e)
- {
- log.error("Sender thread failed", e);
- break;
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/JMSStressTestBase.java
----------------------------------------------------------------------
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/JMSStressTestBase.java b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/JMSStressTestBase.java
deleted file mode 100644
index 93b368c..0000000
--- a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/JMSStressTestBase.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.jms.tests.stress;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.Topic;
-import javax.jms.XASession;
-
-import org.apache.activemq.jms.tests.ActiveMQServerTestCase;
-import org.apache.activemq.jms.tests.util.ProxyAssertSupport;
-import org.junit.After;
-import org.junit.BeforeClass;
-
-/**
- *
- * Base class for stress tests
- *
- * @author <a href="tim.fox@jboss.com">Tim Fox</a>
- *
-*/
-public abstract class JMSStressTestBase extends ActiveMQServerTestCase
-{
- public static final boolean STRESS_TESTS_ENABLED = false;
-
- @BeforeClass
- public static void stressTestsEnabled()
- {
- org.junit.Assume.assumeTrue(JMSStressTestBase.STRESS_TESTS_ENABLED);
- }
-
- protected static final int NUM_PERSISTENT_MESSAGES = 4000;
-
- protected static final int NUM_NON_PERSISTENT_MESSAGES = 6000;
-
- protected static final int NUM_PERSISTENT_PRESEND = 5000;
-
- protected static final int NUM_NON_PERSISTENT_PRESEND = 3000;
-
- protected ConnectionFactory cf;
-
- protected Destination topic;
-
- protected Destination destinationQueue1;
- protected Destination destinationQueue2;
- protected Destination destinationQueue3;
- protected Destination destinationQueue4;
-
- protected Topic topic1;
- protected Topic topic2;
- protected Topic topic3;
- protected Topic topic4;
-
- @Override
- @After
- public void tearDown() throws Exception
- {
- super.tearDown();
- if (checkNoMessageData())
- {
- ProxyAssertSupport.fail("Message data still exists");
- }
- }
-
- protected void runRunners(final Runner[] runners) throws Exception
- {
- Thread[] threads = new Thread[runners.length];
- for (int i = 0; i < runners.length; i++)
- {
- threads[i] = new Thread(runners[i]);
- threads[i].start();
- }
-
- for (int i = 0; i < runners.length; i++)
- {
- threads[i].join();
- }
-
- for (int i = 0; i < runners.length; i++)
- {
- if (runners[i].isFailed())
- {
- ProxyAssertSupport.fail("Runner " + i + " failed");
- log.error("runner failed");
- }
- }
- }
-
- protected void tweakXASession(final XASession sess)
- {
-
- }
-}