You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/01/28 21:20:07 UTC

svn commit: r1064858 - in /activemq/trunk: activemq-camel/ activemq-camel/src/test/java/org/apache/activemq/camel/ activemq-camel/src/test/resources/org/apache/activemq/camel/ activemq-core/src/main/java/org/apache/activemq/

Author: gtully
Date: Fri Jan 28 20:20:06 2011
New Revision: 1064858

URL: http://svn.apache.org/viewvc?rev=1064858&view=rev
Log:
new xa test - jdbc and jms

Added:
    activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/JmsJdbcXATest.java   (with props)
    activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/jmsXajdbc.xml   (with props)
Modified:
    activemq/trunk/activemq-camel/pom.xml
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java

Modified: activemq/trunk/activemq-camel/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-camel/pom.xml?rev=1064858&r1=1064857&r2=1064858&view=diff
==============================================================================
--- activemq/trunk/activemq-camel/pom.xml (original)
+++ activemq/trunk/activemq-camel/pom.xml Fri Jan 28 20:20:06 2011
@@ -117,6 +117,52 @@
       <artifactId>hamcrest-all</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.derby</groupId>
+      <artifactId>derby</artifactId>
+      <optional>true</optional>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.xbean</groupId>
+      <artifactId>xbean-spring</artifactId>
+      <optional>true</optional>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.geronimo.components</groupId>
+      <artifactId>geronimo-transaction</artifactId>
+      <version>2.1</version>
+      <optional>true</optional>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.jencks</groupId>
+      <artifactId>jencks</artifactId>
+      <version>2.2</version>
+      <optional>true</optional>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.jencks</groupId>
+      <artifactId>jencks-amqpool</artifactId>
+      <version>2.2</version>
+      <optional>true</optional>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-jdbc</artifactId>
+      <version>${camel-version}</version>
+      <optional>true</optional>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <optional>true</optional>
+      <scope>test</scope>
+    </dependency>
 
   </dependencies>
 

Added: activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/JmsJdbcXATest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/JmsJdbcXATest.java?rev=1064858&view=auto
==============================================================================
--- activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/JmsJdbcXATest.java (added)
+++ activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/JmsJdbcXATest.java Fri Jan 28 20:20:06 2011
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.util.Wait;
+import org.apache.camel.spring.SpringTestSupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.enhydra.jdbc.pool.StandardXAPoolDataSource;
+import org.springframework.context.support.AbstractXmlApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+/**
+ *  shows broker heuristic rollback (no prepare memory), hence duplicate message delivery
+ */
+public class JmsJdbcXATest extends SpringTestSupport {
+    private static final Log LOG = LogFactory.getLog(JmsJdbcXATest.class);
+    BrokerService broker = null;
+
+    public java.sql.Connection initDb() throws Exception {
+        String createStatement =
+                "CREATE TABLE SCP_INPUT_MESSAGES (" +
+                        "id int NOT NULL GENERATED ALWAYS AS IDENTITY, " +
+                        "messageId varchar(96) NOT NULL, " +
+                        "messageCorrelationId varchar(96) NOT NULL, " +
+                        "messageContent varchar(2048) NOT NULL, " +
+                        "PRIMARY KEY (id) )";
+
+        java.sql.Connection conn = null;
+        StandardXAPoolDataSource pool = getMandatoryBean(StandardXAPoolDataSource.class, "jdbcEnhydraXaDataSource");
+        conn = pool.getConnection();
+        try {
+            conn.createStatement().execute(createStatement);
+        } catch (SQLException alreadyExists) {
+            log.info("ex on create tables", alreadyExists);
+        }
+
+        try {
+            conn.createStatement().execute("DELETE FROM SCP_INPUT_MESSAGES");
+        } catch (SQLException ex) {
+            log.info("ex on create delete all", ex);
+        }
+
+        return conn;
+    }
+
+    private int dumpDb(java.sql.Connection jdbcConn) throws Exception {
+        int count = 0;
+        ResultSet resultSet = jdbcConn.createStatement().executeQuery("SELECT * FROM SCP_INPUT_MESSAGES");
+        while (resultSet.next()) {
+            count++;
+            log.info("message - seq:" + resultSet.getInt(1)
+                    + ", id: " + resultSet.getString(2)
+                    + ", corr: " + resultSet.getString(3)
+                    + ", content: " + resultSet.getString(4));
+        }
+        return count;
+    }
+
+    public void testRecovery() throws Exception {
+
+        broker = createBroker(true);
+        broker.setPlugins(new BrokerPlugin[]{
+                new BrokerPluginSupport() {
+                    @Override
+                    public void commitTransaction(ConnectionContext context,
+                                                  TransactionId xid, boolean onePhase) throws Exception {
+                        if (onePhase) {
+                            super.commitTransaction(context, xid, onePhase);
+                        } else {
+                            // die before doing the commit
+                            // so commit will hang as if reply is lost
+                            context.setDontSendReponse(true);
+                            Executors.newSingleThreadExecutor().execute(new Runnable() {
+                                public void run() {
+                                    LOG.info("Stopping broker post commit...");
+                                    try {
+                                        broker.stop();
+                                    } catch (Exception e) {
+                                        e.printStackTrace();
+                                    }
+                                }
+                            });
+                        }
+                    }
+                }
+        });
+        broker.start();
+
+        final java.sql.Connection jdbcConn = initDb();
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://testXA");
+        factory.setWatchTopicAdvisories(false);
+        Connection connection = factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(new ActiveMQQueue("scp_transacted"));
+        TextMessage message = session.createTextMessage("Some Text");
+        message.setJMSCorrelationID("pleaseCorrelate");
+        producer.send(message);
+
+        LOG.info("waiting for route to kick in, it will kill the broker on first 2pc commit");
+        // will be stopped by the plugin on first 2pc commit
+        broker.waitUntilStopped();
+        assertEquals("message in db, commit to db worked", 1, dumpDb(jdbcConn));
+
+        LOG.info("Broker stopped, restarting...");
+        broker = createBroker(false);
+        broker.start();
+        broker.waitUntilStarted();
+
+        LOG.info("waiting for completion or route with replayed message");
+        assertTrue("got a second message in the db", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 2 == dumpDb(jdbcConn);
+            }
+        }));
+        assertEquals("message in db", 2, dumpDb(jdbcConn));
+    }
+
+    private BrokerService createBroker(boolean deleteAllMessages) throws Exception {
+        BrokerService brokerService = new BrokerService();
+        brokerService.setDeleteAllMessagesOnStartup(deleteAllMessages);
+        brokerService.setBrokerName("testXA");
+        brokerService.setAdvisorySupport(false);
+        brokerService.setUseJmx(false);
+        brokerService.setDataDirectory("target/data");
+        brokerService.addConnector("tcp://0.0.0.0:61616");
+        return brokerService;
+    }
+
+    @Override
+    protected AbstractXmlApplicationContext createApplicationContext() {
+        return new ClassPathXmlApplicationContext("org/apache/activemq/camel/jmsXajdbc.xml");
+    }
+}

Propchange: activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/JmsJdbcXATest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/JmsJdbcXATest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/jmsXajdbc.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/jmsXajdbc.xml?rev=1064858&view=auto
==============================================================================
--- activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/jmsXajdbc.xml (added)
+++ activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/jmsXajdbc.xml Fri Jan 28 20:20:06 2011
@@ -0,0 +1,120 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<!-- START SNIPPET: example -->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="
+       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
+       http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
+    ">
+
+        <!-- build broker in code so it can be restarted and modified to test recovery -->
+
+        <!-- some logging that can help
+          log4j.logger.org.apache.activemq.TransactionContext=TRACE
+          log4j.logger.org.springframework.transaction.support.AbstractPlatformTransactionManager=TRACE
+          log4j.logger.org.apache.geronimo.transaction.manager=TRACE
+          log4j.logger.org.enhydra.jdbc=TRACE
+        -->
+
+		<!-- XID factory -->
+		<bean id="xidFactory" class="org.apache.geronimo.transaction.manager.XidFactoryImpl" />
+
+		<!-- Transaction log -->
+		<bean id="transactionLog" class="org.jencks.factory.HowlLogFactoryBean">	
+			<property name="logFileDir" value="target/data/howl/txlog"/>
+			<property name="xidFactory" ref="xidFactory"/>
+		</bean>
+
+		<!-- Setup the geronimo transaction manager -->
+		<bean id="jenckTransactionManager" class="org.jencks.factory.TransactionManagerFactoryBean">
+			<property name="transactionLog" ref="transactionLog"/>
+		</bean>
+
+		<!-- Configure the Spring framework to use JTA transactions from Geronimo -->
+		<bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
+			<property name="transactionManager" ref="jenckTransactionManager"/>
+		</bean>
+
+		<!-- Using the jencks ActiveMQ pool to enable XA -->
+		<bean id="jmsXaConnectionFactory" class="org.jencks.amqpool.XaPooledConnectionFactory">
+	    	<constructor-arg value="tcp://localhost:61616" />
+	    	<property name="maxConnections" value="8" />
+	    	<property name="transactionManager" ref="jenckTransactionManager" />
+		</bean>
+
+		<!-- Define the activemq Camel component so we can integrate with the AMQ broker -->
+		<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
+			<property name="transacted" value="true"/>
+			<property name="transactionManager" ref="jtaTransactionManager"/>
+			<property name="connectionFactory" ref="jmsXaConnectionFactory"/>
+            <!-- cache level is important, can be cache connection or none, as session needs to be enlisted
+            in the current transaction they can't be cached, with default cache sessions, they are created
+            up front, before the transaction (required for the route) -->
+            <property name="cacheLevel" value="0" />
+		</bean>
+
+	  <!-- Setup the connection manager -->
+	  <bean id="connectionManager" class="org.jencks.factory.ConnectionManagerFactoryBean">
+	    <property name="transactionManager" ref="jenckTransactionManager" />
+	    <property name="transaction" value="xa" />
+	  </bean>
+
+	  <!-- Setup the JDBC Managed Connection Factory (that supports XA) -->
+	  <!--bean id="jdbcManagedConnectionFactory" class="org.jencks.tranql.XAPoolDataSourceMCF">
+	    <property name="driverName" value="com.mysql.jdbc.Driver"/>
+	    <property name="url" value="jdbc:mysql://localhost/ScpBuffer?relaxAutoCommit=true"/>
+	    <property name="user" value="rails"/>
+	    <property name="password" value="rails"/>
+	  </bean -->
+	
+	  <bean id="jdbcEnhydraXaDataSource" class="org.enhydra.jdbc.pool.StandardXAPoolDataSource" destroy-method="shutdown">
+	        <property name="dataSource">
+	            <bean class="org.enhydra.jdbc.standard.StandardXADataSource" destroy-method="shutdown">
+
+	                <!-- property name="driverName" value="com.mysql.jdbc.Driver" />
+	                <property name="url" value="jdbc:mysql://localhost/ScpBuffer?relaxAutoCommit=true" / -->
+                    <!-- try embedded derby xa -->
+                    <property name="driverName" value="org.apache.derby.jdbc.EmbeddedDriver" />
+                    <property name="url" value="jdbc:derby:target/XatestDs;create=true" />
+                    <property name="transactionManager" ref="jenckTransactionManager" />
+	            </bean>
+	        </property>
+            <property name="transactionManager" ref="jenckTransactionManager" />
+	   </bean>
+
+		<bean id="required" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
+			<property name="transactionManager" ref="jenckTransactionManager"/>
+			<property name="propagationBehaviorName" value="PROPAGATION_REQUIRED"/>
+		</bean>
+
+		<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
+				<route id="queueToDbTransacted">
+					<from uri="activemq:queue:scp_transacted"/>
+                    <transacted ref="required"/>
+					<convertBodyTo type="java.lang.String"/>
+					<to uri="log:BeforeSettingBody?showAll=true"/>
+					<setBody>
+						<simple>INSERT INTO SCP_INPUT_MESSAGES(messageId, messageCorrelationId, messageContent) VALUES('${in.header.JMSMessageId}','${in.header.JMSCorrelationId}','${in.body}')</simple>
+					</setBody>
+					<to uri="jdbc:jdbcEnhydraXaDataSource"/>
+				</route>
+			</camelContext>
+
+</beans>

Propchange: activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/jmsXajdbc.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/jmsXajdbc.xml
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/jmsXajdbc.xml
------------------------------------------------------------------------------
    svn:mime-type = text/xml

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java?rev=1064858&r1=1064857&r2=1064858&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java Fri Jan 28 20:20:06 2011
@@ -502,7 +502,7 @@ public class TransactionContext implemen
     public void commit(Xid xid, boolean onePhase) throws XAException {
 
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Commit: " + xid);
+            LOG.debug("Commit: " + xid + ", onePhase=" + onePhase);
         }
         
         // We allow interleaving multiple transactions, so
@@ -534,14 +534,16 @@ public class TransactionContext implemen
 
         } catch (JMSException e) {
             LOG.warn("commit of: " + x + " failed with: " + e, e);
-            List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
-            if (l != null && !l.isEmpty()) {
-                for (TransactionContext ctx : l) {
-                    try {
-                        ctx.afterRollback();
-                    } catch (Throwable ignored) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("failed to firing afterRollback callbacks commit failure, txid: " + x + ", context: " + ctx, ignored);
+            if (onePhase) {
+                List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
+                if (l != null && !l.isEmpty()) {
+                    for (TransactionContext ctx : l) {
+                        try {
+                            ctx.afterRollback();
+                        } catch (Throwable ignored) {
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("failed to firing afterRollback callbacks commit failure, txid: " + x + ", context: " + ctx, ignored);
+                            }
                         }
                     }
                 }
@@ -746,4 +748,11 @@ public class TransactionContext implemen
         associatedXid = null;
         transactionId = null;
     }
+
+    @Override
+    public String toString() {
+        return "TransactionContext{" +
+                "transactionId=" + transactionId +
+                '}';
+    }
 }