You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/06/02 17:59:40 UTC

svn commit: r411195 [2/2] - in /incubator/servicemix/trunk/servicemix-core: ./ src/main/java/org/apache/servicemix/jbi/framework/ src/main/java/org/apache/servicemix/jbi/management/ src/main/java/org/apache/servicemix/jbi/messaging/ src/main/java/org/a...

Added: incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/TransactionsTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/TransactionsTest.java?rev=411195&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/TransactionsTest.java (added)
+++ incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/TransactionsTest.java Fri Jun  2 08:59:39 2006
@@ -0,0 +1,532 @@
+package org.apache.servicemix.jbi.messaging;
+
+import java.sql.Connection;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.InOut;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.resource.spi.ConnectionManager;
+import javax.resource.spi.ManagedConnectionFactory;
+import javax.sql.DataSource;
+import javax.sql.XADataSource;
+import javax.transaction.Status;
+import javax.transaction.TransactionManager;
+import javax.xml.namespace.QName;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.derby.jdbc.EmbeddedXADataSource;
+import org.apache.geronimo.connector.outbound.GenericConnectionManager;
+import org.apache.geronimo.connector.outbound.connectionmanagerconfig.NoPool;
+import org.apache.geronimo.connector.outbound.connectionmanagerconfig.XATransactions;
+import org.apache.geronimo.transaction.context.GeronimoTransactionManager;
+import org.apache.geronimo.transaction.context.TransactionContextManager;
+import org.apache.geronimo.transaction.manager.TransactionManagerImpl;
+import org.apache.servicemix.MessageExchangeListener;
+import org.apache.servicemix.client.DefaultServiceMixClient;
+import org.apache.servicemix.client.ServiceMixClient;
+import org.apache.servicemix.components.util.ComponentSupport;
+import org.apache.servicemix.components.util.OutBinding;
+import org.apache.servicemix.jbi.container.JBIContainer;
+import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.jbi.nmr.flow.Flow;
+import org.apache.servicemix.jbi.nmr.flow.jca.JCAFlow;
+import org.apache.servicemix.jbi.nmr.flow.seda.SedaFlow;
+import org.apache.servicemix.store.Store;
+import org.apache.servicemix.store.jdbc.JdbcStoreFactory;
+import org.apache.servicemix.tck.ExchangeCompletedListener;
+import org.tranql.connector.AllExceptionsAreFatalSorter;
+import org.tranql.connector.jdbc.AbstractXADataSourceMCF;
+
+public class TransactionsTest extends TestCase {
+
+    public static final long TIMEOUT = 1000;
+    
+    private JBIContainer jbi;
+    private BrokerService broker;
+    private TransactionManager tm;
+    private ServiceMixClient client;
+    private DataSource dataSource;
+    private Connection connection;
+    private Store store;
+    private ExchangeCompletedListener listener;
+    
+    protected void setUp() throws Exception {
+
+        // Create an AMQ broker
+        broker = new BrokerService();
+        broker.setUseJmx(false);
+        broker.setPersistent(false);
+        broker.addConnector("tcp://localhost:61616");
+        broker.start();
+        
+        TransactionManagerImpl exTransactionManager = new TransactionManagerImpl(600, null, null);
+        TransactionContextManager transactionContextManager = new TransactionContextManager(exTransactionManager, exTransactionManager);
+        tm = (TransactionManager) new GeronimoTransactionManager(transactionContextManager);
+        
+        // Create an embedded database for testing tx results when commit / rollback
+        ConnectionManager cm = new GenericConnectionManager(
+                        new XATransactions(true, true),
+                        new NoPool(),
+                        false,
+                        null,
+                        transactionContextManager,
+                        "connectionManager",
+                        GenericConnectionManager.class.getClassLoader());
+        ManagedConnectionFactory mcf = new DerbyDataSourceMCF("target/testdb");
+        dataSource = (DataSource) mcf.createConnectionFactory(cm);
+        
+        connection = dataSource.getConnection();
+        
+        JdbcStoreFactory storeFactory = new JdbcStoreFactory();
+        storeFactory.setDataSource(dataSource);
+        storeFactory.setTransactional(true);
+        store = storeFactory.open("store");
+        
+        JCAFlow jcaFlow = new JCAFlow();
+        jcaFlow.setTransactionContextManager(transactionContextManager);
+        
+        jbi = new JBIContainer();
+        jbi.setFlows(new Flow[] { new SedaFlow(), jcaFlow });
+        jbi.setEmbedded(true);
+        jbi.setUseMBeanServer(false);
+        jbi.setCreateMBeanServer(false);
+        jbi.setTransactionManager(tm);
+        jbi.setAutoEnlistInTransaction(true);
+        listener = new ExchangeCompletedListener();
+        jbi.addListener(listener);
+        jbi.init();
+        jbi.start();
+        
+        client = new DefaultServiceMixClient(jbi);
+    }
+    
+    protected void tearDown() throws Exception {
+        listener.assertExchangeCompleted();
+        jbi.shutDown();
+        Thread.sleep(100);
+        broker.stop();
+        connection.close();
+    }
+    
+    protected InOnly createInOnly() throws Exception {
+        InOnly me = client.createInOnlyExchange();
+        me.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+        me.setService(new QName("service"));
+        return me;
+    }
+    
+    protected InOut createInOut() throws Exception {
+        InOut me = client.createInOutExchange();
+        me.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+        me.setService(new QName("service"));
+        return me;
+    }
+    
+    public void testInOnlyAsyncSendAndListener() throws Exception {
+        jbi.activateComponent(new Listener(false, false), "target");
+        
+        MessageExchange me = createInOnly();
+        tm.begin();
+        assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+        client.send(me);
+        assertNull(client.receive(TIMEOUT));
+        tm.commit();
+        assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+        
+        me = client.receive(TIMEOUT);
+        assertNotNull(me);
+        assertEquals(ExchangeStatus.DONE, me.getStatus());
+        assertTrue(me.isTransacted());
+        assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+        
+        assertNotNull(store.load(me.getExchangeId()));
+    }
+    
+    public void testInOnlyAsyncSendAndListenerWithRollback() throws Exception {
+        jbi.activateComponent(new Listener(false, true), "target");
+        
+        MessageExchange me = createInOnly();
+        tm.begin();
+        assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+        client.send(me);
+        assertNull(client.receive(TIMEOUT));
+        tm.commit();
+        assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+        
+        assertNull(client.receive(TIMEOUT));
+        
+        assertNull(store.load(me.getExchangeId()));
+    }
+    
+    public void testInOnlySyncSendAndListener() throws Exception {
+        jbi.activateComponent(new Listener(false, false), "target");
+        
+        MessageExchange me = createInOnly();
+        tm.begin();
+        assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+        client.sendSync(me, TIMEOUT);
+        assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+        assertEquals(ExchangeStatus.DONE, me.getStatus());
+        assertTrue(me.isTransacted());
+        tm.commit();
+        assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+        
+        assertNotNull(store.load(me.getExchangeId()));
+    }
+    
+    public void testInOnlySyncSendAndListenerWithProviderRollback() throws Exception {
+        jbi.activateComponent(new Listener(false, true), "target");
+        
+        MessageExchange me = createInOnly();
+        tm.begin();
+        assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+        client.sendSync(me, TIMEOUT);
+        assertEquals(Status.STATUS_MARKED_ROLLBACK, tm.getStatus());
+        assertEquals(ExchangeStatus.DONE, me.getStatus());
+        assertTrue(me.isTransacted());
+        tm.rollback();
+        assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+        
+        assertNull(store.load(me.getExchangeId()));
+    }
+    
+    public void testInOnlySyncSendAndListenerWithConsumerRollback() throws Exception {
+        jbi.activateComponent(new Listener(false, false), "target");
+        
+        MessageExchange me = createInOnly();
+        tm.begin();
+        assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+        client.sendSync(me, TIMEOUT);
+        assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+        tm.setRollbackOnly();
+        assertEquals(Status.STATUS_MARKED_ROLLBACK, tm.getStatus());
+        assertEquals(ExchangeStatus.DONE, me.getStatus());
+        assertTrue(me.isTransacted());
+        tm.rollback();
+        assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+        
+        assertNull(store.load(me.getExchangeId()));
+    }
+    
+    public void testInOnlyAsyncSendAndPoll() throws Exception {
+        jbi.activateComponent(new Async(false, false), "target");
+        
+        MessageExchange me = createInOnly();
+        tm.begin();
+        assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+        client.send(me);
+        assertNull(client.receive(TIMEOUT));
+        tm.commit();
+        assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+        
+        me = client.receive(TIMEOUT);
+        assertNotNull(me);
+        assertEquals(ExchangeStatus.DONE, me.getStatus());
+        assertTrue(me.isTransacted());
+        assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+        
+        assertNotNull(store.load(me.getExchangeId()));
+    }
+    
+    public void testInOnlyAsyncSendAndPollWithRollback() throws Exception {
+        jbi.activateComponent(new Async(false, true), "target");
+        
+        MessageExchange me = createInOnly();
+        tm.begin();
+        assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+        client.send(me);
+        assertNull(client.receive(TIMEOUT));
+        tm.commit();
+        assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+        
+        assertNull(client.receive(TIMEOUT));
+        
+        assertNull(store.load(me.getExchangeId()));
+    }
+    
+    public void testInOnlySyncSendAndPoll() throws Exception {
+        jbi.activateComponent(new Async(false, false), "target");
+        
+        MessageExchange me = createInOnly();
+        tm.begin();
+        assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+        client.sendSync(me, TIMEOUT);
+        assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+        assertEquals(ExchangeStatus.DONE, me.getStatus());
+        assertTrue(me.isTransacted());
+        tm.commit();
+        assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+        
+        assertNotNull(store.load(me.getExchangeId()));
+    }
+    
+    public void testInOnlySyncSendAndPollWithProviderRollback() throws Exception {
+        jbi.activateComponent(new Async(false, true), "target");
+        
+        MessageExchange me = createInOnly();
+        tm.begin();
+        assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+        client.sendSync(me, TIMEOUT);
+        assertEquals(Status.STATUS_MARKED_ROLLBACK, tm.getStatus());
+        assertEquals(ExchangeStatus.DONE, me.getStatus());
+        assertTrue(me.isTransacted());
+        tm.rollback();
+        assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+        
+        assertNull(store.load(me.getExchangeId()));
+    }
+    
+    public void testInOnlySyncSendAndPollWithConsumerRollback() throws Exception {
+        jbi.activateComponent(new Async(false, false), "target");
+        
+        MessageExchange me = createInOnly();
+        tm.begin();
+        assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+        client.sendSync(me, TIMEOUT);
+        assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+        tm.setRollbackOnly();
+        assertEquals(Status.STATUS_MARKED_ROLLBACK, tm.getStatus());
+        assertEquals(ExchangeStatus.DONE, me.getStatus());
+        assertTrue(me.isTransacted());
+        tm.rollback();
+        assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+        
+        assertNull(store.load(me.getExchangeId()));
+    }
+
+    public void testInOutAsyncSendAndAsyncSendAndListener() throws Exception {
+        jbi.activateComponent(new Listener(false, false), "target");
+        
+        MessageExchange me = createInOut();
+        tm.begin();
+        assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+        client.send(me);
+        assertNull(client.receive(TIMEOUT));
+        tm.commit();
+        assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+        
+        me = client.receive(TIMEOUT);
+        assertNotNull(me);
+        assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+        assertTrue(me.isTransacted());
+        assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+        client.done(me);
+        
+        assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+        assertNotNull(store.load(me.getExchangeId()));
+    }
+
+    /*
+     * NOT SUPPORTED
+     *
+    public void testInOutAsyncSendAndSyncSendAndListener() throws Exception {
+        jbi.activateComponent(new Listener(true, false), "target");
+        
+        MessageExchange me = createInOut();
+        tm.begin();
+        assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+        client.send(me);
+        assertNull(client.receive(TIMEOUT));
+        tm.commit();
+        assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+        
+        me = client.receive(TIMEOUT);
+        assertNotNull(me);
+        assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+        assertTrue(me.isTransacted());
+        assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+        client.done(me);
+        
+        assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+        assertNotNull(store.load(me.getExchangeId()));
+    }
+    */
+    
+    /*
+     * NOT SUPPORTED
+     *
+    public void testInOutSyncSendAndAsyncSendAndListener() throws Exception {
+        jbi.activateComponent(new Listener(false, false), "target");
+        
+        MessageExchange me = createInOut();
+        tm.begin();
+        assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+        client.sendSync(me, TIMEOUT);
+        assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+        assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+        assertTrue(me.isTransacted());
+        client.done(me);
+        assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+        tm.commit();
+        assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+        
+        assertNotNull(store.load(me.getExchangeId()));
+    }
+    */
+    
+    public void testInOutSyncSendAndSyncSendAndListener() throws Exception {
+        jbi.activateComponent(new Listener(true, false), "target");
+        
+        MessageExchange me = createInOut();
+        tm.begin();
+        assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+        client.sendSync(me, TIMEOUT);
+        assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+        assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+        assertTrue(me.isTransacted());
+        client.done(me);
+        assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+        tm.commit();
+        assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+        
+        assertNotNull(store.load(me.getExchangeId()));
+    }
+    
+    public void testInOutAsyncSendAndAsyncSendAndPoll() throws Exception {
+        jbi.activateComponent(new Async(false, false), "target");
+        
+        MessageExchange me = createInOut();
+        tm.begin();
+        assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+        client.send(me);
+        assertNull(client.receive(TIMEOUT));
+        tm.commit();
+        assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+        
+        me = client.receive(TIMEOUT);
+        assertNotNull(me);
+        assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+        assertTrue(me.isTransacted());
+        assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+        client.done(me);
+        
+        assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+        assertNotNull(store.load(me.getExchangeId()));
+    }
+    
+    public void testInOutSyncSendAndSyncSendAndPoll() throws Exception {
+        jbi.activateComponent(new Async(true, false), "target");
+        
+        MessageExchange me = createInOut();
+        tm.begin();
+        assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+        client.sendSync(me, TIMEOUT);
+        assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+        assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+        assertTrue(me.isTransacted());
+        client.done(me);
+        assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+        tm.commit();
+        assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
+        
+        assertNotNull(store.load(me.getExchangeId()));
+   }
+    
+    protected class Async extends OutBinding {
+        private boolean sync;
+        private boolean rollback;
+        public Async(boolean sync, boolean rollback) {
+            this.sync = sync;
+            this.rollback = rollback;
+            setService(new QName("service"));
+            setEndpoint("endpoint");
+        }
+        protected void process(MessageExchange exchange, NormalizedMessage message) throws Exception {
+            if (exchange.getStatus() != ExchangeStatus.ACTIVE) {
+                return;
+            }
+            try {
+                store.store(exchange.getExchangeId(), exchange);
+            } catch (Exception e) {
+                throw new MessagingException(e);
+            }
+            if (rollback) {
+                try {
+                    tm.setRollbackOnly();
+                } catch (Exception e) {
+                    throw new MessagingException(e);
+                }
+            }
+            if (exchange instanceof InOnly) {
+                exchange.setStatus(ExchangeStatus.DONE);
+                getDeliveryChannel().send(exchange);
+            } else {
+                NormalizedMessage msg = exchange.createMessage();
+                msg.setContent(exchange.getMessage("in").getContent());
+                exchange.setMessage(msg, "out");
+                if (sync) {
+                    getDeliveryChannel().sendSync(exchange);
+                } else {
+                    getDeliveryChannel().send(exchange);
+                }
+            }
+        }
+    }
+    
+    protected class Listener extends ComponentSupport implements MessageExchangeListener {
+        private boolean sync;
+        private boolean rollback;
+        public Listener(boolean sync, boolean rollback) {
+            this.sync = sync;
+            this.rollback = rollback;
+            setService(new QName("service"));
+            setEndpoint("endpoint");
+        }
+        public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+            if (exchange.getStatus() != ExchangeStatus.ACTIVE) {
+                return;
+            }
+            try {
+                store.store(exchange.getExchangeId(), exchange);
+            } catch (Exception e) {
+                throw new MessagingException(e);
+            }
+            if (rollback) {
+                try {
+                    tm.setRollbackOnly();
+                } catch (Exception e) {
+                    throw new MessagingException(e);
+                }
+            }
+            if (exchange instanceof InOnly) {
+                exchange.setStatus(ExchangeStatus.DONE);
+                getDeliveryChannel().send(exchange);
+            } else {
+                NormalizedMessage msg = exchange.createMessage();
+                msg.setContent(exchange.getMessage("in").getContent());
+                exchange.setMessage(msg, "out");
+                if (sync) {
+                    getDeliveryChannel().sendSync(exchange, TIMEOUT);
+                } else {
+                    getDeliveryChannel().send(exchange);
+                }
+            }
+        }
+        
+    }
+    
+    public static class DerbyDataSourceMCF extends AbstractXADataSourceMCF {
+        private static final long serialVersionUID = 7971682207810098396L;
+        protected DerbyDataSourceMCF(String dbName) {
+            super(createXADS(dbName), new AllExceptionsAreFatalSorter());
+        }
+        public String getPassword() {
+            return null;
+        }
+        public String getUserName() {
+            return null;
+        }
+        protected static XADataSource createXADS(String dbName) {
+            EmbeddedXADataSource xads = new EmbeddedXADataSource();
+            xads.setDatabaseName(dbName);
+            xads.setCreateDatabase("create");
+            return xads;
+        }
+    }
+    
+}

Modified: incubator/servicemix/trunk/servicemix-core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-core/src/test/resources/log4j.properties?rev=411195&r1=411194&r2=411195&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/test/resources/log4j.properties (original)
+++ incubator/servicemix/trunk/servicemix-core/src/test/resources/log4j.properties Fri Jun  2 08:59:39 2006
@@ -11,7 +11,7 @@
 # CONSOLE appender not used by default
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} | %-5.5p | %-16.16t | %-32.32c{1} | %-32.32C %4L | %m%n
 
 # File appender
 log4j.appender.out=org.apache.log4j.FileAppender