You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/06/02 17:59:40 UTC
svn commit: r411195 [2/2] - in /incubator/servicemix/trunk/servicemix-core:
./ src/main/java/org/apache/servicemix/jbi/framework/
src/main/java/org/apache/servicemix/jbi/management/
src/main/java/org/apache/servicemix/jbi/messaging/ src/main/java/org/a...
Added: incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/TransactionsTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/TransactionsTest.java?rev=411195&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/TransactionsTest.java (added)
+++ incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/TransactionsTest.java Fri Jun 2 08:59:39 2006
@@ -0,0 +1,532 @@
+package org.apache.servicemix.jbi.messaging;
+
+import java.sql.Connection;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.InOut;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.resource.spi.ConnectionManager;
+import javax.resource.spi.ManagedConnectionFactory;
+import javax.sql.DataSource;
+import javax.sql.XADataSource;
+import javax.transaction.Status;
+import javax.transaction.TransactionManager;
+import javax.xml.namespace.QName;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.derby.jdbc.EmbeddedXADataSource;
+import org.apache.geronimo.connector.outbound.GenericConnectionManager;
+import org.apache.geronimo.connector.outbound.connectionmanagerconfig.NoPool;
+import org.apache.geronimo.connector.outbound.connectionmanagerconfig.XATransactions;
+import org.apache.geronimo.transaction.context.GeronimoTransactionManager;
+import org.apache.geronimo.transaction.context.TransactionContextManager;
+import org.apache.geronimo.transaction.manager.TransactionManagerImpl;
+import org.apache.servicemix.MessageExchangeListener;
+import org.apache.servicemix.client.DefaultServiceMixClient;
+import org.apache.servicemix.client.ServiceMixClient;
+import org.apache.servicemix.components.util.ComponentSupport;
+import org.apache.servicemix.components.util.OutBinding;
+import org.apache.servicemix.jbi.container.JBIContainer;
+import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.jbi.nmr.flow.Flow;
+import org.apache.servicemix.jbi.nmr.flow.jca.JCAFlow;
+import org.apache.servicemix.jbi.nmr.flow.seda.SedaFlow;
+import org.apache.servicemix.store.Store;
+import org.apache.servicemix.store.jdbc.JdbcStoreFactory;
+import org.apache.servicemix.tck.ExchangeCompletedListener;
+import org.tranql.connector.AllExceptionsAreFatalSorter;
+import org.tranql.connector.jdbc.AbstractXADataSourceMCF;
+
+public class TransactionsTest extends TestCase {
+
+ public static final long TIMEOUT = 1000;
+
+ private JBIContainer jbi;
+ private BrokerService broker;
+ private TransactionManager tm;
+ private ServiceMixClient client;
+ private DataSource dataSource;
+ private Connection connection;
+ private Store store;
+ private ExchangeCompletedListener listener;
+
+ protected void setUp() throws Exception {
+
+ // Create an AMQ broker
+ broker = new BrokerService();
+ broker.setUseJmx(false);
+ broker.setPersistent(false);
+ broker.addConnector("tcp://localhost:61616");
+ broker.start();
+
+ TransactionManagerImpl exTransactionManager = new TransactionManagerImpl(600, null, null);
+ TransactionContextManager transactionContextManager = new TransactionContextManager(exTransactionManager, exTransactionManager);
+ tm = (TransactionManager) new GeronimoTransactionManager(transactionContextManager);
+
+ // Create an embedded database for testing tx results when commit / rollback
+ ConnectionManager cm = new GenericConnectionManager(
+ new XATransactions(true, true),
+ new NoPool(),
+ false,
+ null,
+ transactionContextManager,
+ "connectionManager",
+ GenericConnectionManager.class.getClassLoader());
+ ManagedConnectionFactory mcf = new DerbyDataSourceMCF("target/testdb");
+ dataSource = (DataSource) mcf.createConnectionFactory(cm);
+
+ connection = dataSource.getConnection();
+
+ JdbcStoreFactory storeFactory = new JdbcStoreFactory();
+ storeFactory.setDataSource(dataSource);
+ storeFactory.setTransactional(true);
+ store = storeFactory.open("store");
+
+ JCAFlow jcaFlow = new JCAFlow();
+ jcaFlow.setTransactionContextManager(transactionContextManager);
+
+ jbi = new JBIContainer();
+ jbi.setFlows(new Flow[] { new SedaFlow(), jcaFlow });
+ jbi.setEmbedded(true);
+ jbi.setUseMBeanServer(false);
+ jbi.setCreateMBeanServer(false);
+ jbi.setTransactionManager(tm);
+ jbi.setAutoEnlistInTransaction(true);
+ listener = new ExchangeCompletedListener();
+ jbi.addListener(listener);
+ jbi.init();
+ jbi.start();
+
+ client = new DefaultServiceMixClient(jbi);
+ }
+
+ protected void tearDown() throws Exception {
+ listener.assertExchangeCompleted();
+ jbi.shutDown();
+ Thread.sleep(100);
+ broker.stop();
+ connection.close();
+ }
+
+ protected InOnly createInOnly() throws Exception {
+ InOnly me = client.createInOnlyExchange();
+ me.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+ me.setService(new QName("service"));
+ return me;
+ }
+
+ protected InOut createInOut() throws Exception {
+ InOut me = client.createInOutExchange();
+ me.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+ me.setService(new QName("service"));
+ return me;
+ }
+
+ public void testInOnlyAsyncSendAndListener() throws Exception {
+ jbi.activateComponent(new Listener(false, false), "target");
+
+ MessageExchange me = createInOnly();
+ tm.begin();
+ assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+ client.send(me);
+ assertNull(client.receive(TIMEOUT));
+ tm.commit();
+ assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+
+ me = client.receive(TIMEOUT);
+ assertNotNull(me);
+ assertEquals(ExchangeStatus.DONE, me.getStatus());
+ assertTrue(me.isTransacted());
+ assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+
+ assertNotNull(store.load(me.getExchangeId()));
+ }
+
+ public void testInOnlyAsyncSendAndListenerWithRollback() throws Exception {
+ jbi.activateComponent(new Listener(false, true), "target");
+
+ MessageExchange me = createInOnly();
+ tm.begin();
+ assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+ client.send(me);
+ assertNull(client.receive(TIMEOUT));
+ tm.commit();
+ assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+
+ assertNull(client.receive(TIMEOUT));
+
+ assertNull(store.load(me.getExchangeId()));
+ }
+
+ public void testInOnlySyncSendAndListener() throws Exception {
+ jbi.activateComponent(new Listener(false, false), "target");
+
+ MessageExchange me = createInOnly();
+ tm.begin();
+ assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+ client.sendSync(me, TIMEOUT);
+ assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+ assertEquals(ExchangeStatus.DONE, me.getStatus());
+ assertTrue(me.isTransacted());
+ tm.commit();
+ assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+
+ assertNotNull(store.load(me.getExchangeId()));
+ }
+
+ public void testInOnlySyncSendAndListenerWithProviderRollback() throws Exception {
+ jbi.activateComponent(new Listener(false, true), "target");
+
+ MessageExchange me = createInOnly();
+ tm.begin();
+ assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+ client.sendSync(me, TIMEOUT);
+ assertEquals(Status.STATUS_MARKED_ROLLBACK, tm.getStatus());
+ assertEquals(ExchangeStatus.DONE, me.getStatus());
+ assertTrue(me.isTransacted());
+ tm.rollback();
+ assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+
+ assertNull(store.load(me.getExchangeId()));
+ }
+
+ public void testInOnlySyncSendAndListenerWithConsumerRollback() throws Exception {
+ jbi.activateComponent(new Listener(false, false), "target");
+
+ MessageExchange me = createInOnly();
+ tm.begin();
+ assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+ client.sendSync(me, TIMEOUT);
+ assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+ tm.setRollbackOnly();
+ assertEquals(Status.STATUS_MARKED_ROLLBACK, tm.getStatus());
+ assertEquals(ExchangeStatus.DONE, me.getStatus());
+ assertTrue(me.isTransacted());
+ tm.rollback();
+ assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+
+ assertNull(store.load(me.getExchangeId()));
+ }
+
+ public void testInOnlyAsyncSendAndPoll() throws Exception {
+ jbi.activateComponent(new Async(false, false), "target");
+
+ MessageExchange me = createInOnly();
+ tm.begin();
+ assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+ client.send(me);
+ assertNull(client.receive(TIMEOUT));
+ tm.commit();
+ assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+
+ me = client.receive(TIMEOUT);
+ assertNotNull(me);
+ assertEquals(ExchangeStatus.DONE, me.getStatus());
+ assertTrue(me.isTransacted());
+ assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+
+ assertNotNull(store.load(me.getExchangeId()));
+ }
+
+ public void testInOnlyAsyncSendAndPollWithRollback() throws Exception {
+ jbi.activateComponent(new Async(false, true), "target");
+
+ MessageExchange me = createInOnly();
+ tm.begin();
+ assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+ client.send(me);
+ assertNull(client.receive(TIMEOUT));
+ tm.commit();
+ assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+
+ assertNull(client.receive(TIMEOUT));
+
+ assertNull(store.load(me.getExchangeId()));
+ }
+
+ public void testInOnlySyncSendAndPoll() throws Exception {
+ jbi.activateComponent(new Async(false, false), "target");
+
+ MessageExchange me = createInOnly();
+ tm.begin();
+ assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+ client.sendSync(me, TIMEOUT);
+ assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+ assertEquals(ExchangeStatus.DONE, me.getStatus());
+ assertTrue(me.isTransacted());
+ tm.commit();
+ assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+
+ assertNotNull(store.load(me.getExchangeId()));
+ }
+
+ public void testInOnlySyncSendAndPollWithProviderRollback() throws Exception {
+ jbi.activateComponent(new Async(false, true), "target");
+
+ MessageExchange me = createInOnly();
+ tm.begin();
+ assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+ client.sendSync(me, TIMEOUT);
+ assertEquals(Status.STATUS_MARKED_ROLLBACK, tm.getStatus());
+ assertEquals(ExchangeStatus.DONE, me.getStatus());
+ assertTrue(me.isTransacted());
+ tm.rollback();
+ assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+
+ assertNull(store.load(me.getExchangeId()));
+ }
+
+ public void testInOnlySyncSendAndPollWithConsumerRollback() throws Exception {
+ jbi.activateComponent(new Async(false, false), "target");
+
+ MessageExchange me = createInOnly();
+ tm.begin();
+ assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+ client.sendSync(me, TIMEOUT);
+ assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+ tm.setRollbackOnly();
+ assertEquals(Status.STATUS_MARKED_ROLLBACK, tm.getStatus());
+ assertEquals(ExchangeStatus.DONE, me.getStatus());
+ assertTrue(me.isTransacted());
+ tm.rollback();
+ assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+
+ assertNull(store.load(me.getExchangeId()));
+ }
+
+ public void testInOutAsyncSendAndAsyncSendAndListener() throws Exception {
+ jbi.activateComponent(new Listener(false, false), "target");
+
+ MessageExchange me = createInOut();
+ tm.begin();
+ assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+ client.send(me);
+ assertNull(client.receive(TIMEOUT));
+ tm.commit();
+ assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+
+ me = client.receive(TIMEOUT);
+ assertNotNull(me);
+ assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+ assertTrue(me.isTransacted());
+ assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+ client.done(me);
+
+ assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+ assertNotNull(store.load(me.getExchangeId()));
+ }
+
+ /*
+ * NOT SUPPORTED
+ *
+ public void testInOutAsyncSendAndSyncSendAndListener() throws Exception {
+ jbi.activateComponent(new Listener(true, false), "target");
+
+ MessageExchange me = createInOut();
+ tm.begin();
+ assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+ client.send(me);
+ assertNull(client.receive(TIMEOUT));
+ tm.commit();
+ assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+
+ me = client.receive(TIMEOUT);
+ assertNotNull(me);
+ assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+ assertTrue(me.isTransacted());
+ assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+ client.done(me);
+
+ assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+ assertNotNull(store.load(me.getExchangeId()));
+ }
+ */
+
+ /*
+ * NOT SUPPORTED
+ *
+ public void testInOutSyncSendAndAsyncSendAndListener() throws Exception {
+ jbi.activateComponent(new Listener(false, false), "target");
+
+ MessageExchange me = createInOut();
+ tm.begin();
+ assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+ client.sendSync(me, TIMEOUT);
+ assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+ assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+ assertTrue(me.isTransacted());
+ client.done(me);
+ assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+ tm.commit();
+ assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+
+ assertNotNull(store.load(me.getExchangeId()));
+ }
+ */
+
+ public void testInOutSyncSendAndSyncSendAndListener() throws Exception {
+ jbi.activateComponent(new Listener(true, false), "target");
+
+ MessageExchange me = createInOut();
+ tm.begin();
+ assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+ client.sendSync(me, TIMEOUT);
+ assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+ assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+ assertTrue(me.isTransacted());
+ client.done(me);
+ assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+ tm.commit();
+ assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+
+ assertNotNull(store.load(me.getExchangeId()));
+ }
+
+ public void testInOutAsyncSendAndAsyncSendAndPoll() throws Exception {
+ jbi.activateComponent(new Async(false, false), "target");
+
+ MessageExchange me = createInOut();
+ tm.begin();
+ assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+ client.send(me);
+ assertNull(client.receive(TIMEOUT));
+ tm.commit();
+ assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+
+ me = client.receive(TIMEOUT);
+ assertNotNull(me);
+ assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+ assertTrue(me.isTransacted());
+ assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+ client.done(me);
+
+ assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+ assertNotNull(store.load(me.getExchangeId()));
+ }
+
+ public void testInOutSyncSendAndSyncSendAndPoll() throws Exception {
+ jbi.activateComponent(new Async(true, false), "target");
+
+ MessageExchange me = createInOut();
+ tm.begin();
+ assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+ client.sendSync(me, TIMEOUT);
+ assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+ assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+ assertTrue(me.isTransacted());
+ client.done(me);
+ assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+ tm.commit();
+ assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+
+ assertNotNull(store.load(me.getExchangeId()));
+ }
+
+ protected class Async extends OutBinding {
+ private boolean sync;
+ private boolean rollback;
+ public Async(boolean sync, boolean rollback) {
+ this.sync = sync;
+ this.rollback = rollback;
+ setService(new QName("service"));
+ setEndpoint("endpoint");
+ }
+ protected void process(MessageExchange exchange, NormalizedMessage message) throws Exception {
+ if (exchange.getStatus() != ExchangeStatus.ACTIVE) {
+ return;
+ }
+ try {
+ store.store(exchange.getExchangeId(), exchange);
+ } catch (Exception e) {
+ throw new MessagingException(e);
+ }
+ if (rollback) {
+ try {
+ tm.setRollbackOnly();
+ } catch (Exception e) {
+ throw new MessagingException(e);
+ }
+ }
+ if (exchange instanceof InOnly) {
+ exchange.setStatus(ExchangeStatus.DONE);
+ getDeliveryChannel().send(exchange);
+ } else {
+ NormalizedMessage msg = exchange.createMessage();
+ msg.setContent(exchange.getMessage("in").getContent());
+ exchange.setMessage(msg, "out");
+ if (sync) {
+ getDeliveryChannel().sendSync(exchange);
+ } else {
+ getDeliveryChannel().send(exchange);
+ }
+ }
+ }
+ }
+
+ protected class Listener extends ComponentSupport implements MessageExchangeListener {
+ private boolean sync;
+ private boolean rollback;
+ public Listener(boolean sync, boolean rollback) {
+ this.sync = sync;
+ this.rollback = rollback;
+ setService(new QName("service"));
+ setEndpoint("endpoint");
+ }
+ public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+ if (exchange.getStatus() != ExchangeStatus.ACTIVE) {
+ return;
+ }
+ try {
+ store.store(exchange.getExchangeId(), exchange);
+ } catch (Exception e) {
+ throw new MessagingException(e);
+ }
+ if (rollback) {
+ try {
+ tm.setRollbackOnly();
+ } catch (Exception e) {
+ throw new MessagingException(e);
+ }
+ }
+ if (exchange instanceof InOnly) {
+ exchange.setStatus(ExchangeStatus.DONE);
+ getDeliveryChannel().send(exchange);
+ } else {
+ NormalizedMessage msg = exchange.createMessage();
+ msg.setContent(exchange.getMessage("in").getContent());
+ exchange.setMessage(msg, "out");
+ if (sync) {
+ getDeliveryChannel().sendSync(exchange, TIMEOUT);
+ } else {
+ getDeliveryChannel().send(exchange);
+ }
+ }
+ }
+
+ }
+
+ public static class DerbyDataSourceMCF extends AbstractXADataSourceMCF {
+ private static final long serialVersionUID = 7971682207810098396L;
+ protected DerbyDataSourceMCF(String dbName) {
+ super(createXADS(dbName), new AllExceptionsAreFatalSorter());
+ }
+ public String getPassword() {
+ return null;
+ }
+ public String getUserName() {
+ return null;
+ }
+ protected static XADataSource createXADS(String dbName) {
+ EmbeddedXADataSource xads = new EmbeddedXADataSource();
+ xads.setDatabaseName(dbName);
+ xads.setCreateDatabase("create");
+ return xads;
+ }
+ }
+
+}
Modified: incubator/servicemix/trunk/servicemix-core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-core/src/test/resources/log4j.properties?rev=411195&r1=411194&r2=411195&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/test/resources/log4j.properties (original)
+++ incubator/servicemix/trunk/servicemix-core/src/test/resources/log4j.properties Fri Jun 2 08:59:39 2006
@@ -11,7 +11,7 @@
# CONSOLE appender not used by default
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} | %-5.5p | %-16.16t | %-32.32c{1} | %-32.32C %4L | %m%n
# File appender
log4j.appender.out=org.apache.log4j.FileAppender