You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/05/06 15:27:50 UTC

svn commit: r1100208 - in /activemq/trunk: ./ activemq-camel/ activemq-camel/src/test/java/org/apache/activemq/camel/ activemq-camel/src/test/resources/org/apache/activemq/camel/ activemq-core/ activemq-core/src/main/java/org/apache/activemq/ activemq-...

Author: gtully
Date: Fri May  6 13:27:49 2011
New Revision: 1100208

URL: http://svn.apache.org/viewvc?rev=1100208&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3305 - Implement "exactly once" delivery with kahaDB and XA in the event of a failure post prepare.
Fixed up transaction broker recovery processing and kahadb store such that pending recovered
messages and acks wait for and respect the eventual xa transction outcome. Essentially
implementing exactly once delivery semantics on failure. Updated the camel jms to jdbc
test route to validate correct failure recovery processing with geronimo, test pulls in
xa wrappers from jencks and openejb such that NamedXAResources are registered with geronimo.
Additional unit tests added.

Modified:
    activemq/trunk/activemq-camel/pom.xml
    activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/JmsJdbcXATest.java
    activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/jmsXajdbc.xml
    activemq/trunk/activemq-core/pom.xml
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/XATransactionId.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ActiveMQResourceManager.java
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java
    activemq/trunk/pom.xml

Modified: activemq/trunk/activemq-camel/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-camel/pom.xml?rev=1100208&r1=1100207&r2=1100208&view=diff
==============================================================================
--- activemq/trunk/activemq-camel/pom.xml (original)
+++ activemq/trunk/activemq-camel/pom.xml Fri May  6 13:27:49 2011
@@ -132,7 +132,20 @@
     <dependency>
       <groupId>org.apache.geronimo.components</groupId>
       <artifactId>geronimo-transaction</artifactId>
-      <version>2.1</version>
+      <optional>true</optional>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.objectweb.howl</groupId>
+      <artifactId>howl</artifactId>
+      <version>1.0.1-1</version>
+      <optional>true</optional>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.openejb</groupId>
+      <artifactId>openejb-core</artifactId>
+      <version>3.1.2</version>
       <optional>true</optional>
       <scope>test</scope>
     </dependency>
@@ -144,6 +157,18 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>activemq-spring</artifactId>
+      <optional>true</optional>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>activemq-ra</artifactId>
+      <optional>true</optional>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.jencks</groupId>
       <artifactId>jencks-amqpool</artifactId>
       <version>2.2</version>

Modified: activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/JmsJdbcXATest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/JmsJdbcXATest.java?rev=1100208&r1=1100207&r2=1100208&view=diff
==============================================================================
--- activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/JmsJdbcXATest.java (original)
+++ activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/JmsJdbcXATest.java Fri May  6 13:27:49 2011
@@ -19,7 +19,6 @@ package org.apache.activemq.camel;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import javax.jms.Connection;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
@@ -33,18 +32,19 @@ import org.apache.activemq.command.Activ
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.util.Wait;
 import org.apache.camel.spring.SpringTestSupport;
+import org.apache.commons.dbcp.BasicDataSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.enhydra.jdbc.pool.StandardXAPoolDataSource;
 import org.springframework.context.support.AbstractXmlApplicationContext;
 import org.springframework.context.support.ClassPathXmlApplicationContext;
 
 /**
- *  shows broker heuristic rollback (no prepare memory), hence duplicate message delivery
+ *  shows broker 'once only delivery' and recovery with XA
  */
 public class JmsJdbcXATest extends SpringTestSupport {
     private static final Logger LOG = LoggerFactory.getLogger(JmsJdbcXATest.class);
     BrokerService broker = null;
+    int messageCount;
 
     public java.sql.Connection initDb() throws Exception {
         String createStatement =
@@ -55,9 +55,7 @@ public class JmsJdbcXATest extends Sprin
                         "messageContent varchar(2048) NOT NULL, " +
                         "PRIMARY KEY (id) )";
 
-        java.sql.Connection conn = null;
-        StandardXAPoolDataSource pool = getMandatoryBean(StandardXAPoolDataSource.class, "jdbcEnhydraXaDataSource");
-        conn = pool.getConnection();
+        java.sql.Connection conn = getJDBCConnection();
         try {
             conn.createStatement().execute(createStatement);
         } catch (SQLException alreadyExists) {
@@ -73,6 +71,11 @@ public class JmsJdbcXATest extends Sprin
         return conn;
     }
 
+    private java.sql.Connection getJDBCConnection() throws Exception {
+        BasicDataSource dataSource = getMandatoryBean(BasicDataSource.class, "managedDataSourceWithRecovery");
+        return dataSource.getConnection();
+    }
+
     private int dumpDb(java.sql.Connection jdbcConn) throws Exception {
         int count = 0;
         ResultSet resultSet = jdbcConn.createStatement().executeQuery("SELECT * FROM SCP_INPUT_MESSAGES");
@@ -86,48 +89,10 @@ public class JmsJdbcXATest extends Sprin
         return count;
     }
 
-    public void testRecovery() throws Exception {
-
-        broker = createBroker(true);
-        broker.setPlugins(new BrokerPlugin[]{
-                new BrokerPluginSupport() {
-                    @Override
-                    public void commitTransaction(ConnectionContext context,
-                                                  TransactionId xid, boolean onePhase) throws Exception {
-                        if (onePhase) {
-                            super.commitTransaction(context, xid, onePhase);
-                        } else {
-                            // die before doing the commit
-                            // so commit will hang as if reply is lost
-                            context.setDontSendReponse(true);
-                            Executors.newSingleThreadExecutor().execute(new Runnable() {
-                                public void run() {
-                                    LOG.info("Stopping broker post commit...");
-                                    try {
-                                        broker.stop();
-                                    } catch (Exception e) {
-                                        e.printStackTrace();
-                                    }
-                                }
-                            });
-                        }
-                    }
-                }
-        });
-        broker.start();
-
-        final java.sql.Connection jdbcConn = initDb();
-
-        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://testXA");
-        factory.setWatchTopicAdvisories(false);
-        Connection connection = factory.createConnection();
-        connection.start();
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageProducer producer = session.createProducer(new ActiveMQQueue("scp_transacted"));
-        TextMessage message = session.createTextMessage("Some Text");
-        message.setJMSCorrelationID("pleaseCorrelate");
-        producer.send(message);
+    public void testRecoveryCommit() throws Exception {
+        java.sql.Connection jdbcConn = initDb();
 
+        sendJMSMessageToKickOffRoute();
         LOG.info("waiting for route to kick in, it will kill the broker on first 2pc commit");
         // will be stopped by the plugin on first 2pc commit
         broker.waitUntilStopped();
@@ -137,15 +102,51 @@ public class JmsJdbcXATest extends Sprin
         broker = createBroker(false);
         broker.start();
         broker.waitUntilStarted();
+        assertEquals("pending transactions", 1, broker.getBroker().getPreparedTransactions(null).length);
+
+        // TM stays actively committing first message ack which won't get redelivered - xa once only delivery
+        LOG.info("waiting for recovery to complete");
+        assertTrue("recovery complete in time", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return broker.getBroker().getPreparedTransactions(null).length == 0;
+            }
+        }));
+        // verify recovery complete
+        assertEquals("recovery complete", 0, broker.getBroker().getPreparedTransactions(null).length);
+
+        final java.sql.Connection freshConnection = getJDBCConnection();
+        assertTrue("did not get replay", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 1 == dumpDb(freshConnection);
+            }
+        }));
+        assertEquals("still one message in db", 1, dumpDb(freshConnection));
+
+        // let once complete ok
+        sendJMSMessageToKickOffRoute();
 
-        LOG.info("waiting for completion or route with replayed message");
-        assertTrue("got a second message in the db", Wait.waitFor(new Wait.Condition() {
+        assertTrue("got second message", Wait.waitFor(new Wait.Condition() {
             @Override
             public boolean isSatisified() throws Exception {
-                return 2 == dumpDb(jdbcConn);
+                return 2 == dumpDb(freshConnection);
             }
         }));
-        assertEquals("message in db", 2, dumpDb(jdbcConn));
+        assertEquals("two messages in db", 2, dumpDb(freshConnection));
+    }
+
+    private void sendJMSMessageToKickOffRoute() throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://testXA");
+        factory.setWatchTopicAdvisories(false);
+        Connection connection = factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(new ActiveMQQueue("scp_transacted"));
+        TextMessage message = session.createTextMessage("Some Text, messageCount:" + messageCount++);
+        message.setJMSCorrelationID("pleaseCorrelate");
+        producer.send(message);
+        connection.close();
     }
 
     private BrokerService createBroker(boolean deleteAllMessages) throws Exception {
@@ -161,6 +162,42 @@ public class JmsJdbcXATest extends Sprin
 
     @Override
     protected AbstractXmlApplicationContext createApplicationContext() {
+
+        deleteDirectory("target/data/howl");
+
+        // make broker available to recovery processing on app context start
+        try {
+            broker = createBroker(true);
+            broker.setPlugins(new BrokerPlugin[]{
+                new BrokerPluginSupport() {
+                    @Override
+                    public void commitTransaction(ConnectionContext context,
+                                                  TransactionId xid, boolean onePhase) throws Exception {
+                        if (onePhase) {
+                            super.commitTransaction(context, xid, onePhase);
+                        } else {
+                            // die before doing the commit
+                            // so commit will hang as if reply is lost
+                            context.setDontSendReponse(true);
+                            Executors.newSingleThreadExecutor().execute(new Runnable() {
+                                public void run() {
+                                    LOG.info("Stopping broker post commit...");
+                                    try {
+                                        broker.stop();
+                                    } catch (Exception e) {
+                                        e.printStackTrace();
+                                    }
+                                }
+                            });
+                        }
+                    }
+                }
+            });
+            broker.start();
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to start broker", e);
+        }
+
         return new ClassPathXmlApplicationContext("org/apache/activemq/camel/jmsXajdbc.xml");
     }
 }

Modified: activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/jmsXajdbc.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/jmsXajdbc.xml?rev=1100208&r1=1100207&r2=1100208&view=diff
==============================================================================
--- activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/jmsXajdbc.xml (original)
+++ activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/jmsXajdbc.xml Fri May  6 13:27:49 2011
@@ -14,107 +14,102 @@
     See the License for the specific language governing permissions and
     limitations under the License.
 -->
-<!-- START SNIPPET: example -->
 
+<!-- START SNIPPET: jms_jdbc_xa -->
 <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:context="http://www.springframework.org/schema/context"
        xsi:schemaLocation="
-       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
+       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
        http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
        http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
     ">
 
-        <!-- build broker in code so it can be restarted and modified to test recovery -->
+    <context:annotation-config />
+    <!-- broker creation in code so it can be restarted and modified to test recovery -->
 
-        <!-- some logging that can help
-          log4j.logger.org.apache.activemq.TransactionContext=TRACE
-          log4j.logger.org.springframework.transaction.support.AbstractPlatformTransactionManager=TRACE
-          log4j.logger.org.apache.geronimo.transaction.manager=TRACE
-          log4j.logger.org.enhydra.jdbc=TRACE
-        -->
-
-		<!-- XID factory -->
-		<bean id="xidFactory" class="org.apache.geronimo.transaction.manager.XidFactoryImpl" />
-
-		<!-- Transaction log -->
-		<bean id="transactionLog" class="org.jencks.factory.HowlLogFactoryBean">	
-			<property name="logFileDir" value="target/data/howl/txlog"/>
-			<property name="xidFactory" ref="xidFactory"/>
-		</bean>
-
-		<!-- Setup the geronimo transaction manager -->
-		<bean id="jenckTransactionManager" class="org.jencks.factory.TransactionManagerFactoryBean">
-			<property name="transactionLog" ref="transactionLog"/>
-		</bean>
-
-		<!-- Configure the Spring framework to use JTA transactions from Geronimo -->
-		<bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
-			<property name="transactionManager" ref="jenckTransactionManager"/>
-		</bean>
-
-		<!-- Using the jencks ActiveMQ pool to enable XA -->
-		<bean id="jmsXaConnectionFactory" class="org.jencks.amqpool.XaPooledConnectionFactory">
-	    	<constructor-arg value="tcp://localhost:61616" />
-	    	<property name="maxConnections" value="8" />
-	    	<property name="transactionManager" ref="jenckTransactionManager" />
-		</bean>
-
-		<!-- Define the activemq Camel component so we can integrate with the AMQ broker -->
-		<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
-			<property name="transacted" value="true"/>
-			<property name="transactionManager" ref="jtaTransactionManager"/>
-			<property name="connectionFactory" ref="jmsXaConnectionFactory"/>
-            <!-- cache level is important, can be cache connection or none, as session needs to be enlisted
-            in the current transaction they can't be cached, with default cache sessions, they are created
-            up front, before the transaction (required for the route) -->
-            <property name="cacheLevel" value="0" />
-		</bean>
-
-	  <!-- Setup the connection manager -->
-	  <bean id="connectionManager" class="org.jencks.factory.ConnectionManagerFactoryBean">
-	    <property name="transactionManager" ref="jenckTransactionManager" />
-	    <property name="transaction" value="xa" />
-	  </bean>
-
-	  <!-- Setup the JDBC Managed Connection Factory (that supports XA) -->
-	  <!--bean id="jdbcManagedConnectionFactory" class="org.jencks.tranql.XAPoolDataSourceMCF">
-	    <property name="driverName" value="com.mysql.jdbc.Driver"/>
-	    <property name="url" value="jdbc:mysql://localhost/ScpBuffer?relaxAutoCommit=true"/>
-	    <property name="user" value="rails"/>
-	    <property name="password" value="rails"/>
-	  </bean -->
-	
-	  <bean id="jdbcEnhydraXaDataSource" class="org.enhydra.jdbc.pool.StandardXAPoolDataSource" destroy-method="shutdown">
-	        <property name="dataSource">
-	            <bean class="org.enhydra.jdbc.standard.StandardXADataSource" destroy-method="shutdown">
-
-	                <!-- property name="driverName" value="com.mysql.jdbc.Driver" />
-	                <property name="url" value="jdbc:mysql://localhost/ScpBuffer?relaxAutoCommit=true" / -->
-                    <!-- try embedded derby xa -->
-                    <property name="driverName" value="org.apache.derby.jdbc.EmbeddedDriver" />
-                    <property name="url" value="jdbc:derby:target/XatestDs;create=true" />
-                    <property name="transactionManager" ref="jenckTransactionManager" />
-	            </bean>
-	        </property>
-            <property name="transactionManager" ref="jenckTransactionManager" />
-	   </bean>
-
-		<bean id="required" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
-			<property name="transactionManager" ref="jenckTransactionManager"/>
-			<property name="propagationBehaviorName" value="PROPAGATION_REQUIRED"/>
-		</bean>
-
-		<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
-				<route id="queueToDbTransacted">
-					<from uri="activemq:queue:scp_transacted"/>
-                    <transacted ref="required"/>
-					<convertBodyTo type="java.lang.String"/>
-					<to uri="log:BeforeSettingBody?showAll=true"/>
-					<setBody>
-						<simple>INSERT INTO SCP_INPUT_MESSAGES(messageId, messageCorrelationId, messageContent) VALUES('${in.header.JMSMessageId}','${in.header.JMSCorrelationId}','${in.body}')</simple>
-					</setBody>
-					<to uri="jdbc:jdbcEnhydraXaDataSource"/>
-				</route>
-			</camelContext>
+    <!-- use jencks factory beans to easily configure howl and geronimo transaction manager -->
+    <bean id="xidFactory" class="org.apache.geronimo.transaction.manager.XidFactoryImpl"/>
+    <!-- Transaction log -->
+    <bean id="transactionLog" class="org.jencks.factory.HowlLogFactoryBean">
+        <property name="logFileDir" value="target/data/howl/txlog"/>
+        <property name="xidFactory" ref="xidFactory"/>
+    </bean>
+    <bean id="jenckTransactionManager" class="org.jencks.factory.TransactionManagerFactoryBean">
+        <property name="transactionLog" ref="transactionLog"/>
+    </bean>
+
+    <bean id="activemqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
+        <property name="brokerURL" value="tcp://localhost:61616?jms.dispatchAsync=false"/>
+    </bean>
+
+    <!-- register ActiveMQ with Geronimo to allow out of band transaction recovery/completion on a new connection
+         the resourceName gives the ActiveMQ XAResource an identity, Geronimo NamedXAResource in the transaction log
+    -->
+    <bean id="resourceManager" class="org.apache.activemq.pool.ActiveMQResourceManager" init-method="recoverResource">
+        <property name="transactionManager" ref="jenckTransactionManager"/>
+        <property name="connectionFactory" ref="activemqConnectionFactory"/>
+        <property name="resourceName" value="activemq.broker"/>
+    </bean>
+
+    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactoryBean"
+          depends-on="jenckTransactionManager">
+        <property name="maxConnections" value="1"/>
+        <property name="transactionManager" ref="jenckTransactionManager"/>
+        <property name="connectionFactory" ref="activemqConnectionFactory"/>
+        <property name="resourceName" value="activemq.broker"/>
+    </bean>
+
+    <!-- Configure the Spring framework (used by camel) to use JTA transactions from Geronimo -->
+    <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
+        <property name="transactionManager" ref="jenckTransactionManager"/>
+    </bean>
+
+    <!-- Define the activemq Camel component so we can integrate with the AMQ broker -->
+    <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"
+          depends-on="pooledConnectionFactory">
+        <property name="transacted" value="true"/>
+        <property name="transactionManager" ref="jtaTransactionManager"/>
+        <property name="connectionFactory" ref="pooledConnectionFactory"/>
+        <!-- cache level is important, can be cache connection or none, as session needs to be enlisted
+             in the current transaction they can't be cached, with default cache sessions, they are created
+             up front, before the transaction (required for the route) -->
+        <property name="cacheLevel" value="0"/>
+    </bean>
+
+    <!-- openejb provides geronimo NamedXAResources wrapper around commons dbcp such that they have an identity in the howl log -->
+    <bean id="geronimoXAResourceWrapper"
+          class="org.apache.openejb.resource.GeronimoTransactionManagerFactory.GeronimoXAResourceWrapper"/>
+    <bean id="managedDataSourceWithRecovery" class="org.apache.openejb.resource.jdbc.ManagedDataSourceWithRecovery">
+        <constructor-arg>
+            <ref bean="geronimoXAResourceWrapper"></ref>
+        </constructor-arg>
+        <property name="jdbcDriver" value="org.apache.derby.jdbc.EmbeddedDriver"/>
+        <property name="jdbcUrl" value="jdbc:derby:target/XatestDs;create=true"/>
+        <property name="transactionManager" ref="jenckTransactionManager"/>
+    </bean>
+
+    <bean id="required" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
+        <property name="transactionManager" ref="jenckTransactionManager"/>
+        <property name="propagationBehaviorName" value="PROPAGATION_REQUIRED"/>
+    </bean>
+
+    <!-- the route, from jms to jdbc in an xa transaction -->
+    <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
+        <route id="queueToDbTransacted">
+            <from uri="activemq:queue:scp_transacted"/>
+            <transacted ref="required"/>
+            <convertBodyTo type="java.lang.String"/>
+            <to uri="log:BeforeSettingBody?showAll=true"/>
+            <setBody>
+                <simple>INSERT INTO SCP_INPUT_MESSAGES(messageId, messageCorrelationId, messageContent)
+                    VALUES('${in.header.JMSMessageId}','${in.header.JMSCorrelationId}','${in.body}')
+                </simple>
+            </setBody>
+            <to uri="jdbc:managedDataSourceWithRecovery"/>
+        </route>
+    </camelContext>
 
 </beans>
+<!-- END SNIPPET: jms_jdbc_xa -->

Modified: activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=1100208&r1=1100207&r2=1100208&view=diff
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Fri May  6 13:27:49 2011
@@ -437,10 +437,6 @@
             <!--  these seem to fail only in m2 -->
             <!--<exclude>**/TransactedTopicMasterSlaveTest.*</exclude>-->
 
-            <!-- Kaha in flux - removing tests -->
-            <exclude>**/KahaDBStoreXARecoveryBrokerTest.*</exclude>
-            <exclude>**/KahaDBStoreRecoveryBrokerTest.*</exclude>
-
             <!-- Multicast and UDP based tests fail on GBuild -->
             <exclude>**/PeerTransportTest.*</exclude>
             <exclude>**/MulticastTransportTest.*</exclude>

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java?rev=1100208&r1=1100207&r2=1100208&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java Fri May  6 13:27:49 2011
@@ -135,17 +135,21 @@ public class TransactionContext implemen
             return;
         }
 
+        Throwable firstException = null;
         int size = synchronizations.size();
-        try {
-            for (int i = 0; i < size; i++) {
+        for (int i = 0; i < size; i++) {
+            try {
                 synchronizations.get(i).afterRollback();
+            } catch (Throwable t) {
+                LOG.debug("Exception from afterRollback on " + synchronizations.get(i), t);
+                if (firstException == null) {
+                    firstException = t;
+                }
             }
-        } catch (JMSException e) {
-            throw e;
-        } catch (Throwable e) {
-            throw JMSExceptionSupport.create(e);
-        } finally {
-            synchronizations = null;
+        }
+        synchronizations = null;
+        if (firstException != null) {
+            throw JMSExceptionSupport.create(firstException);
         }
     }
 
@@ -154,17 +158,21 @@ public class TransactionContext implemen
             return;
         }
 
+        Throwable firstException = null;
         int size = synchronizations.size();
-        try {
-            for (int i = 0; i < size; i++) {
+        for (int i = 0; i < size; i++) {
+            try {
                 synchronizations.get(i).afterCommit();
+            } catch (Throwable t) {
+                LOG.debug("Exception from afterCommit on " + synchronizations.get(i), t);
+                if (firstException == null) {
+                    firstException = t;
+                }
             }
-        } catch (JMSException e) {
-            throw e;
-        } catch (Throwable e) {
-            throw JMSExceptionSupport.create(e);
-        } finally {
-            synchronizations = null;
+        }
+        synchronizations = null;
+        if (firstException != null) {
+            throw JMSExceptionSupport.create(firstException);
         }
     }
 
@@ -528,7 +536,11 @@ public class TransactionContext implemen
             List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
             if (l != null && !l.isEmpty()) {
                 for (TransactionContext ctx : l) {
-                    ctx.afterCommit();
+                    try {
+                        ctx.afterCommit();
+                    } catch (Exception ignored) {
+                        LOG.debug("ignoring exception from after completion on ended transaction: " + ignored, ignored);
+                    }
                 }
             }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java?rev=1100208&r1=1100207&r2=1100208&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java Fri May  6 13:27:49 2011
@@ -17,13 +17,21 @@
 package org.apache.activemq.broker;
 
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import javax.jms.JMSException;
 import javax.transaction.xa.XAException;
 
 import org.apache.activemq.ActiveMQMessageAudit;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.BaseCommand;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.LocalTransactionId;
 import org.apache.activemq.command.Message;
@@ -90,13 +98,15 @@ public class TransactionBroker extends B
                 public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) {
                     try {
                         beginTransaction(context, xid);
+                        Transaction transaction = getTransaction(context, xid, false);
                         for (int i = 0; i < addedMessages.length; i++) {
-                            send(producerExchange, addedMessages[i]);
+                            kickDestinationOnCompletion(context, transaction, addedMessages[i].getDestination(), addedMessages[i]);
                         }
                         for (int i = 0; i < aks.length; i++) {
-                            acknowledge(consumerExchange, aks[i]);
+                            kickDestinationOnCompletion(context, transaction, aks[i].getDestination(), aks[i]);
                         }
-                        prepareTransaction(context, xid);
+                        transaction.setState(Transaction.PREPARED_STATE);
+                        LOG.debug("recovered prepared transaction: " + transaction.getTransactionId());
                     } catch (Throwable e) {
                         throw new WrappedException(e);
                     }
@@ -109,6 +119,64 @@ public class TransactionBroker extends B
         next.start();
     }
 
+    private void kickDestinationOnCompletion(ConnectionContext context, Transaction transaction,
+                                             ActiveMQDestination amqDestination, BaseCommand ack) throws Exception {
+        Destination destination =  addDestination(context, amqDestination, false);
+        registerSync(destination, transaction, ack);
+    }
+
+    private void registerSync(Destination destination, Transaction transaction, BaseCommand command) {
+        if (destination instanceof Queue) {
+            Synchronization sync = new PreparedDestinationCompletion((Queue) destination, command.isMessage());
+            // ensure one per destination in the list
+            transaction.removeSynchronization(sync);
+            transaction.addSynchronization(sync);
+        }
+    }
+
+    static class PreparedDestinationCompletion extends Synchronization {
+        final Queue queue;
+        final boolean messageSend;
+        public PreparedDestinationCompletion(final Queue queue, boolean messageSend) {
+            this.queue = queue;
+            // rollback relevant to acks, commit to sends
+            this.messageSend = messageSend;
+        }
+
+        @Override
+        public int hashCode() {
+            return System.identityHashCode(queue) +
+                    System.identityHashCode(Boolean.valueOf(messageSend));
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            return other instanceof PreparedDestinationCompletion &&
+                    queue.equals(((PreparedDestinationCompletion) other).queue) &&
+                    messageSend == ((PreparedDestinationCompletion) other).messageSend;
+        }
+
+        @Override
+        public void afterRollback() throws Exception {
+            if (!messageSend) {
+                queue.clearPendingMessages();
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("cleared pending from afterRollback : " + queue);
+                }
+            }
+        }
+
+        @Override
+        public void afterCommit() throws Exception {
+            if (messageSend) {
+                queue.clearPendingMessages();
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("cleared pending from afterCommit : " + queue);
+                }
+            }
+        }
+    }
+
     public void stop() throws Exception {
         transactionStore.stop();
         next.stop();
@@ -135,7 +203,7 @@ public class TransactionBroker extends B
         XATransactionId rc[] = new XATransactionId[txs.size()];
         txs.toArray(rc);
         if (LOG.isDebugEnabled()) {
-            LOG.debug("prepared transacton list size: " + rc.length);
+            LOG.debug("prepared transaction list size: " + rc.length);
         }
         return rc;
     }
@@ -253,7 +321,7 @@ public class TransactionBroker extends B
             // first find all txs that belongs to the connection
             ArrayList<XATransaction> txs = new ArrayList<XATransaction>();
             for (XATransaction tx : xaTransactions.values()) {
-                if (tx.getConnectionId().equals(info.getConnectionId()) && !tx.isPrepared()) {
+                if (tx.getConnectionId() != null && tx.getConnectionId().equals(info.getConnectionId()) && !tx.isPrepared()) {
                     txs.add(tx);
                 }
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1100208&r1=1100207&r2=1100208&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Fri May  6 13:27:49 2011
@@ -1132,6 +1132,19 @@ public class Queue extends BaseDestinati
         getMessages().clear();
     }
 
+    public void clearPendingMessages() {
+        messagesLock.writeLock().lock();
+        try {
+            if (store != null) {
+                store.resetBatching();
+            }
+            messages.gc();
+            asyncWakeup();
+        } finally {
+            messagesLock.writeLock().unlock();
+        }
+    }
+
     /**
      * Removes the message matching the given messageId
      */

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=1100208&r1=1100207&r2=1100208&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Fri May  6 13:27:49 2011
@@ -56,13 +56,16 @@ public abstract class AbstractStoreCurso
             clear();
             super.start();      
             resetBatch();
-            this.size = getStoreSize();
-            this.storeHasMessages=this.size > 0;
+            resetSize();
             setCacheEnabled(!this.storeHasMessages&&useCache);
         } 
     }
-    
-    
+
+    protected void resetSize() {
+        this.size = getStoreSize();
+        this.storeHasMessages=this.size > 0;
+    }
+
     public final synchronized void stop() throws Exception {
         resetBatch();
         super.stop();
@@ -237,6 +240,7 @@ public abstract class AbstractStoreCurso
         batchList.clear();
         clearIterator(false);
         batchResetNeeded = true;
+        resetSize();
         setCacheEnabled(false);
     }
 
@@ -287,8 +291,9 @@ public abstract class AbstractStoreCurso
         return size;
     }
 
+    @Override
     public String toString() {
-        return regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded
+        return super.toString() + ":" + regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded
                     + ",storeHasMessages=" + this.storeHasMessages + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled();
     }
     

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?rev=1100208&r1=1100207&r2=1100208&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java Fri May  6 13:27:49 2011
@@ -97,9 +97,4 @@ class QueueStorePrefetch extends Abstrac
         this.store.recoverNextMessages(this.maxBatchSize, this);
     }
 
-    @Override
-    public String toString() {
-        return "QueueStorePrefetch" + System.identityHashCode(this);
-    }
-
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/XATransactionId.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/XATransactionId.java?rev=1100208&r1=1100207&r2=1100208&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/XATransactionId.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/XATransactionId.java Fri May  6 13:27:49 2011
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.command;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import javax.transaction.xa.Xid;
 import org.apache.activemq.util.HexSupport;
@@ -34,6 +35,7 @@ public class XATransactionId extends Tra
 
     private transient int hash;
     private transient String transactionKey;
+    private transient ArrayList<MessageAck> preparedAcks;
 
     public XATransactionId() {
     }
@@ -50,8 +52,17 @@ public class XATransactionId extends Tra
 
     public synchronized String getTransactionKey() {
         if (transactionKey == null) {
-            transactionKey = "XID:" + formatId + ":" + HexSupport.toHexFromBytes(globalTransactionId) + ":"
-                             + HexSupport.toHexFromBytes(branchQualifier);
+            StringBuffer s = new StringBuffer();
+            s.append("XID:[globalId=");
+            for (int i = 0; i < globalTransactionId.length; i++) {
+                s.append(Integer.toHexString(globalTransactionId[i]));
+            }
+            s.append(",branchId=");
+            for (int i = 0; i < branchQualifier.length; i++) {
+                s.append(Integer.toHexString(branchQualifier[i]));
+            }
+            s.append("]");
+            transactionKey = s.toString();
         }
         return transactionKey;
     }
@@ -141,4 +152,11 @@ public class XATransactionId extends Tra
         return getTransactionKey().compareTo(xid.getTransactionKey());
     }
 
+    public void setPreparedAcks(ArrayList<MessageAck> preparedAcks) {
+        this.preparedAcks = preparedAcks;
+    }
+
+    public ArrayList<MessageAck> getPreparedAcks() {
+        return preparedAcks;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1100208&r1=1100207&r2=1100208&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Fri May  6 13:27:49 2011
@@ -461,6 +461,9 @@ public class KahaDBStore extends Message
                         for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
                                 .hasNext();) {
                             Entry<Long, MessageKeys> entry = iterator.next();
+                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
+                                continue;
+                            }
                             Message msg = loadMessage(entry.getValue().location);
                             listener.recoverMessage(msg);
                         }
@@ -483,6 +486,9 @@ public class KahaDBStore extends Message
                         for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx);
                              listener.hasSpace() && iterator.hasNext(); ) {
                             entry = iterator.next();
+                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
+                                continue;
+                            }
                             Message msg = loadMessage(entry.getValue().location);
                             listener.recoverMessage(msg);
                             counter++;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java?rev=1100208&r1=1100207&r2=1100208&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java Fri May  6 13:27:49 2011
@@ -26,7 +26,6 @@ import java.util.concurrent.Cancellation
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import javax.transaction.xa.XAException;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
@@ -268,6 +267,7 @@ public class KahaDBTransactionStore impl
                 // ensure message order w.r.t to cursor and store for setBatch()
                 synchronized (this) {
                     theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit);
+                    forgetRecoveredAcks(txid);
                 }
             }
         }else {
@@ -283,11 +283,19 @@ public class KahaDBTransactionStore impl
         if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
             KahaTransactionInfo info = getTransactionInfo(txid);
             theStore.store(new KahaRollbackCommand().setTransactionInfo(info), false, null, null);
+            forgetRecoveredAcks(txid);
         } else {
             inflightTransactions.remove(txid);
         }
     }
 
+    protected void forgetRecoveredAcks(TransactionId txid) throws IOException {
+        if (txid.isXATransaction()) {
+            XATransactionId xaTid = ((XATransactionId) txid);
+            theStore.forgetRecoveredAcks(xaTid.getPreparedAcks());
+        }
+    }
+
     public void start() throws Exception {
     }
 
@@ -295,8 +303,6 @@ public class KahaDBTransactionStore impl
     }
 
     public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
-        // All the inflight transactions get rolled back..
-        // inflightTransactions.clear();
         for (Map.Entry<TransactionId, List<Operation>> entry : theStore.preparedTransactions.entrySet()) {
             XATransactionId xid = (XATransactionId) entry.getKey();
             ArrayList<Message> messageList = new ArrayList<Message>();
@@ -320,6 +326,8 @@ public class KahaDBTransactionStore impl
             MessageAck[] acks = new MessageAck[ackList.size()];
             messageList.toArray(addedMessages);
             ackList.toArray(acks);
+            xid.setPreparedAcks(ackList);
+            theStore.trackRecoveredAcks(ackList);
             listener.recover(xid, addedMessages, acks);
         }
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1100208&r1=1100207&r2=1100208&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Fri May  6 13:27:49 2011
@@ -36,8 +36,10 @@ import java.util.concurrent.locks.Reentr
 import org.apache.activemq.ActiveMQMessageAuditNoSync;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.command.TransactionId;
@@ -1719,7 +1721,35 @@ public class MessageDatabase extends Ser
     // /////////////////////////////////////////////////////////////////
     protected final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
     protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
- 
+    protected final Set<String> ackedAndPrepared = new HashSet<String>();
+
+    // messages that have prepared (pending) acks cannot be redispatched unless the outcome is rollback,
+    // till then they are skipped by the store.
+    // 'at most once' XA guarantee
+    public void trackRecoveredAcks(ArrayList<MessageAck> acks) {
+        this.indexLock.writeLock().lock();
+        try {
+            for (MessageAck ack : acks) {
+                ackedAndPrepared.add(ack.getLastMessageId().toString());
+            }
+        } finally {
+            this.indexLock.writeLock().unlock();
+        }
+    }
+
+    public void forgetRecoveredAcks(ArrayList<MessageAck> acks) throws IOException {
+        if (acks != null) {
+            this.indexLock.writeLock().lock();
+            try {
+                for (MessageAck ack : acks) {
+                    ackedAndPrepared.remove(ack.getLastMessageId().toString());
+                }
+            } finally {
+                this.indexLock.writeLock().unlock();
+            }
+        }
+    }
+
     private List<Operation> getInflightTx(KahaTransactionInfo info, Location location) {
         TransactionId key = key(info);
         List<Operation> tx;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java?rev=1100208&r1=1100207&r2=1100208&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java Fri May  6 13:27:49 2011
@@ -52,7 +52,7 @@ public class XATransaction extends Trans
     @Override
     public void commit(boolean onePhase) throws XAException, IOException {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("XA Transaction commit: " + xid);
+            LOG.debug("XA Transaction commit onePhase:" + onePhase + ", xid: " + xid);
         }
 
         switch (getState()) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java?rev=1100208&r1=1100207&r2=1100208&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java Fri May  6 13:27:49 2011
@@ -19,13 +19,13 @@ package org.apache.activemq.broker;
 import java.io.File;
 import java.io.IOException;
 import java.net.URISyntaxException;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.util.IOHelper;
 
 public class BrokerRestartTestSupport extends BrokerTestSupport {
 
-    private PersistenceAdapter persistenceAdapter;
-
     @Override
     protected BrokerService createBroker() throws Exception {
         BrokerService broker = new BrokerService();
@@ -33,9 +33,8 @@ public class BrokerRestartTestSupport ex
         if (dir != null) {
             IOHelper.deleteChildren(dir);
         }
-        //broker.setPersistent(false);
         broker.setDeleteAllMessagesOnStartup(true);
-        persistenceAdapter = broker.getPersistenceAdapter();
+        configureBroker(broker);
         return broker;
     }
 
@@ -45,10 +44,13 @@ public class BrokerRestartTestSupport ex
      */
     protected BrokerService createRestartedBroker() throws Exception {
         BrokerService broker = new BrokerService();
-        //broker.setPersistenceAdapter(persistenceAdapter);
+        configureBroker(broker);
         return broker;
     }
 
+    protected void configureBroker(BrokerService broker) {
+    }
+
     /**
      * Simulates a broker restart. The memory based persistence adapter is
      * reused so that it does not "loose" it's "persistent" messages.

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java?rev=1100208&r1=1100207&r2=1100208&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java Fri May  6 13:27:49 2011
@@ -543,13 +543,15 @@ public class RecoveryBrokerTest extends 
         // Begin the transaction.
         XATransactionId txid = createXATransaction(sessionInfo);
         connection.send(createBeginTransaction(connectionInfo, txid));
+        Message m = null;
         for (int i = 0; i < NUMBER; i++) {
-            Message m = receiveMessage(connection);
+            m = receiveMessage(connection);
             assertNotNull(m);
-            MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE);
-            ack.setTransactionId(txid);
-            connection.send(ack);
         }
+        MessageAck ack = createAck(consumerInfo, m, NUMBER, MessageAck.STANDARD_ACK_TYPE);
+        ack.setTransactionId(txid);
+        connection.send(ack);
+
         // Don't commit
 
         // restart the broker.
@@ -566,7 +568,7 @@ public class RecoveryBrokerTest extends 
 
         // All messages should be re-delivered.
         for (int i = 0; i < NUMBER; i++) {
-            Message m = receiveMessage(connection);
+            m = receiveMessage(connection);
             assertNotNull(m);
         }
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java?rev=1100208&r1=1100207&r2=1100208&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java Fri May  6 13:27:49 2011
@@ -18,6 +18,8 @@ package org.apache.activemq.broker;
 
 import junit.framework.Test;
 
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ConnectionInfo;
@@ -31,6 +33,7 @@ import org.apache.activemq.command.Sessi
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.TransactionInfo;
 import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.openwire.v5.MessageMarshaller;
 
 /**
  * Used to simulate the recovery that occurs when a broker shuts down.
@@ -70,13 +73,14 @@ public class XARecoveryBrokerTest extend
         }
 
         // Since prepared but not committed.. they should not get delivered.
+        assertNull(receiveMessage(connection));
         assertNoMessagesLeft(connection);
         connection.request(closeConnectionInfo(connectionInfo));
 
         // restart the broker.
         restartBroker();
 
-        // Setup the consumer and receive the message.
+        // Setup the consumer and try receive the message.
         connection = createConnection();
         connectionInfo = createConnectionInfo();
         sessionInfo = createSessionInfo(connectionInfo);
@@ -86,6 +90,7 @@ public class XARecoveryBrokerTest extend
         connection.send(consumerInfo);
 
         // Since prepared but not committed.. they should not get delivered.
+        assertNull(receiveMessage(connection));
         assertNoMessagesLeft(connection);
 
         Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
@@ -98,7 +103,7 @@ public class XARecoveryBrokerTest extend
             connection.send(createCommitTransaction2Phase(connectionInfo, (TransactionId)dar.getData()[i]));
         }
 
-        // We should not get the committed transactions.
+        // We should get the committed transactions.
         for (int i = 0; i < 4; i++) {
             Message m = receiveMessage(connection);
             assertNotNull(m);
@@ -180,13 +185,16 @@ public class XARecoveryBrokerTest extend
         // Begin the transaction.
         XATransactionId txid = createXATransaction(sessionInfo);
         connection.send(createBeginTransaction(connectionInfo, txid));
+        Message m = null;
         for (int i = 0; i < 4; i++) {
-            Message m = receiveMessage(connection);
+            m = receiveMessage(connection);
             assertNotNull(m);
-            MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE);
-            ack.setTransactionId(txid);
-            connection.send(ack);
         }
+
+        MessageAck ack = createAck(consumerInfo, m, 4, MessageAck.STANDARD_ACK_TYPE);
+        ack.setTransactionId(txid);
+        connection.send(ack);
+
         // Commit
         connection.request(createCommitTransaction1Phase(connectionInfo, txid));
 
@@ -205,7 +213,7 @@ public class XARecoveryBrokerTest extend
         // No messages should be delivered.
         assertNoMessagesLeft(connection);
 
-        Message m = receiveMessage(connection);
+        m = receiveMessage(connection);
         assertNull(m);
     }
     
@@ -235,35 +243,133 @@ public class XARecoveryBrokerTest extend
         // Begin the transaction.
         XATransactionId txid = createXATransaction(sessionInfo);
         connection.send(createBeginTransaction(connectionInfo, txid));
+        Message m = null;
         for (int i = 0; i < 4; i++) {
-            Message m = receiveMessage(connection);
+            m = receiveMessage(connection);
             assertNotNull(m);
-            MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE);
-            ack.setTransactionId(txid);
-            connection.send(ack);
         }
-        
+
+        // one ack with last received, mimic a beforeEnd synchronization
+        MessageAck ack = createAck(consumerInfo, m, 4, MessageAck.STANDARD_ACK_TYPE);
+        ack.setTransactionId(txid);
+        connection.send(ack);
+
         connection.request(createPrepareTransaction(connectionInfo, txid));
 
         // restart the broker.
         restartBroker();
 
-        // Setup the consumer and receive the message.
         connection = createConnection();
         connectionInfo = createConnectionInfo();
-        sessionInfo = createSessionInfo(connectionInfo);
         connection.send(connectionInfo);
+
+        // validate recovery
+        TransactionInfo recoverInfo = new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER);
+        DataArrayResponse dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo);
+
+        assertEquals("there is a prepared tx", 1, dataArrayResponse.getData().length);
+        assertEquals("it matches", txid, dataArrayResponse.getData()[0]);
+
+        sessionInfo = createSessionInfo(connectionInfo);
         connection.send(sessionInfo);
         consumerInfo = createConsumerInfo(sessionInfo, destination);
         connection.send(consumerInfo);
         
-        // All messages should be re-delivered.
+        // no redelivery, exactly once semantics unless there is rollback
+        m = receiveMessage(connection);
+        assertNull(m);
+        assertNoMessagesLeft(connection);
+
+        connection.request(createCommitTransaction2Phase(connectionInfo, txid));
+
+        // validate recovery complete
+        dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo);
+        assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length);
+    }
+
+    public void testQueuePersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception {
+
+        ActiveMQDestination destination = createDestination();
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
         for (int i = 0; i < 4; i++) {
-            Message m = receiveMessage(connection);
-            assertNotNull(m);
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            connection.send(message);
         }
 
+        // Setup the consumer and receive the message.
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+
+        // Begin the transaction.
+        XATransactionId txid = createXATransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+        Message message = null;
+        for (int i = 0; i < 4; i++) {
+            message = receiveMessage(connection);
+            assertNotNull(message);
+        }
+
+        // one ack with last received, mimic a beforeEnd synchronization
+        MessageAck ack = createAck(consumerInfo, message, 4, MessageAck.STANDARD_ACK_TYPE);
+        ack.setTransactionId(txid);
+        connection.send(ack);
+
+        connection.request(createPrepareTransaction(connectionInfo, txid));
+
+        // restart the broker.
+        restartBroker();
+
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        connection.send(connectionInfo);
+
+        // validate recovery
+        TransactionInfo recoverInfo = new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER);
+        DataArrayResponse dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo);
+
+        assertEquals("there is a prepared tx", 1, dataArrayResponse.getData().length);
+        assertEquals("it matches", txid, dataArrayResponse.getData()[0]);
+
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(sessionInfo);
+        consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+
+        // no redelivery, exactly once semantics while prepared
+        message = receiveMessage(connection);
+        assertNull(message);
         assertNoMessagesLeft(connection);
+
+        // rollback so we get redelivery
+        connection.request(createRollbackTransaction(connectionInfo, txid));
+
+        // Begin new transaction for redelivery
+        txid = createXATransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+        for (int i = 0; i < 4; i++) {
+            message = receiveMessage(connection);
+            assertNotNull(message);
+        }
+        ack = createAck(consumerInfo, message, 4, MessageAck.STANDARD_ACK_TYPE);
+        ack.setTransactionId(txid);
+        connection.send(ack);
+
+        // Commit
+        connection.request(createCommitTransaction1Phase(connectionInfo, txid));
+
+        // validate recovery complete
+        dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo);
+        assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length);
     }
 
     public void testQueuePersistentUncommittedAcksLostOnRestart() throws Exception {
@@ -292,13 +398,14 @@ public class XARecoveryBrokerTest extend
         // Begin the transaction.
         XATransactionId txid = createXATransaction(sessionInfo);
         connection.send(createBeginTransaction(connectionInfo, txid));
+        Message m = null;
         for (int i = 0; i < 4; i++) {
-            Message m = receiveMessage(connection);
+            m = receiveMessage(connection);
             assertNotNull(m);
-            MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE);
-            ack.setTransactionId(txid);
-            connection.send(ack);
         }
+        MessageAck ack = createAck(consumerInfo, m, 4, MessageAck.STANDARD_ACK_TYPE);
+        ack.setTransactionId(txid);
+        connection.send(ack);
         // Don't commit
 
         // restart the broker.
@@ -315,7 +422,7 @@ public class XARecoveryBrokerTest extend
 
         // All messages should be re-delivered.
         for (int i = 0; i < 4; i++) {
-            Message m = receiveMessage(connection);
+            m = receiveMessage(connection);
             assertNotNull(m);
         }
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java?rev=1100208&r1=1100207&r2=1100208&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java Fri May  6 13:27:49 2011
@@ -33,6 +33,9 @@ import java.util.concurrent.CountDownLat
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import javax.net.ssl.SSLServerSocket;
+import javax.net.ssl.SSLServerSocketFactory;
+import javax.net.ssl.SSLSocketFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,7 +85,7 @@ public class SocketProxy {
     }
 
     public void open() throws Exception {
-        serverSocket = new ServerSocket();
+        serverSocket = createServerSocket(target);
         serverSocket.setReuseAddress(true);
         if (receiveBufferSize > 0) {
             serverSocket.setReceiveBufferSize(receiveBufferSize);
@@ -101,6 +104,24 @@ public class SocketProxy {
         closed = new CountDownLatch(1);
     }
 
+    private boolean isSsl(URI target) {
+        return "ssl".equals(target.getScheme());
+    }
+
+    private ServerSocket createServerSocket(URI target) throws Exception {
+        if (isSsl(target)) {
+            return SSLServerSocketFactory.getDefault().createServerSocket();
+        }
+        return new ServerSocket();
+    }
+
+    private Socket createSocket(URI target) throws Exception {
+        if (isSsl(target)) {
+            return SSLSocketFactory.getDefault().createSocket();
+        }
+        return new Socket();
+    }
+
     public URI getUrl() {
         return proxyUrl;
     }
@@ -226,7 +247,7 @@ public class SocketProxy {
 
         public Bridge(Socket socket, URI target) throws Exception {
             receiveSocket = socket;
-            sendSocket = new Socket();
+            sendSocket = createSocket(target);
             if (receiveBufferSize > 0) {
                 sendSocket.setReceiveBufferSize(receiveBufferSize);
             }

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ActiveMQResourceManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ActiveMQResourceManager.java?rev=1100208&r1=1100207&r2=1100208&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ActiveMQResourceManager.java (original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ActiveMQResourceManager.java Fri May  6 13:27:49 2011
@@ -21,8 +21,11 @@ import java.io.IOException;
 import javax.jms.ConnectionFactory;
 import javax.jms.Session;
 import javax.jms.JMSException;
+import javax.transaction.SystemException;
 import javax.transaction.TransactionManager;
 
+import javax.transaction.xa.XAResource;
+import org.apache.geronimo.transaction.manager.NamedXAResourceFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.activemq.ActiveMQConnectionFactory;
@@ -116,16 +119,51 @@ public class ActiveMQResourceManager {
                     rm.getResourceName() != null && !"".equals(rm.getResourceName());
         }
 
-        public static boolean recover(ActiveMQResourceManager rm) throws IOException {
+        public static boolean recover(final ActiveMQResourceManager rm) throws IOException {
             if (isRecoverable(rm)) {
                 try {
-                    ActiveMQConnectionFactory connFactory = (ActiveMQConnectionFactory) rm.getConnectionFactory();
+                    final ActiveMQConnectionFactory connFactory = (ActiveMQConnectionFactory) rm.getConnectionFactory();
                     ActiveMQConnection activeConn = (ActiveMQConnection)connFactory.createConnection();
-                    ActiveMQSession session = (ActiveMQSession)activeConn.createSession(true, Session.SESSION_TRANSACTED);
+                    final ActiveMQSession session = (ActiveMQSession)activeConn.createSession(true, Session.SESSION_TRANSACTED);
                     NamedXAResource namedXaResource = new WrapperNamedXAResource(session.getTransactionContext(), rm.getResourceName());
 
                     RecoverableTransactionManager rtxManager = (RecoverableTransactionManager) rm.getTransactionManager();
-                    rtxManager.recoverResourceManager(namedXaResource);
+                    rtxManager.registerNamedXAResourceFactory(new NamedXAResourceFactory() {
+
+                        @Override
+                        public String getName() {
+                            return rm.getResourceName();
+                        }
+
+                        @Override
+                        public NamedXAResource getNamedXAResource() throws SystemException {
+                            try {
+                                final ActiveMQConnection activeConn = (ActiveMQConnection)connFactory.createConnection();
+                                final ActiveMQSession session = (ActiveMQSession)activeConn.createSession(true, Session.SESSION_TRANSACTED);
+                                activeConn.start();
+                                LOGGER.debug("new namedXAResource's connection: " + activeConn);
+
+                                return new ConnectionAndWrapperNamedXAResource(session.getTransactionContext(), getName(), activeConn);
+                            } catch (Exception e) {
+                                SystemException se =  new SystemException("Failed to create ConnectionAndWrapperNamedXAResource, " + e.getLocalizedMessage());
+                                se.initCause(e);
+                                LOGGER.error(se.getLocalizedMessage(), se);
+                                throw se;
+                            }
+                        }
+
+                        @Override
+                        public void returnNamedXAResource(NamedXAResource namedXaResource) {
+                            if (namedXaResource instanceof ConnectionAndWrapperNamedXAResource) {
+                                try {
+                                    LOGGER.debug("closing returned namedXAResource's connection: " + ((ConnectionAndWrapperNamedXAResource)namedXaResource).connection);
+                                    ((ConnectionAndWrapperNamedXAResource)namedXaResource).connection.close();
+                                } catch (Exception ignored) {
+                                    LOGGER.debug("failed to close returned namedXAResource: " + namedXaResource, ignored);
+                                }
+                            }
+                        }
+                    });
                     return true;
                 } catch (JMSException e) {
                   throw IOExceptionSupport.create(e);
@@ -136,4 +174,11 @@ public class ActiveMQResourceManager {
         }
     }
 
+    public static class ConnectionAndWrapperNamedXAResource extends WrapperNamedXAResource {
+        final ActiveMQConnection connection;
+        public ConnectionAndWrapperNamedXAResource(XAResource xaResource, String name, ActiveMQConnection connection) {
+            super(xaResource, name);
+            this.connection = connection;
+        }
+    }
 }

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java?rev=1100208&r1=1100207&r2=1100208&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java (original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java Fri May  6 13:27:49 2011
@@ -90,42 +90,52 @@ public class PooledSession implements Se
         if (!ignoreClose) {
             // TODO a cleaner way to reset??
 
-            // lets reset the session
-            getInternalSession().setMessageListener(null);
+            boolean invalidate = false;
+            try {
+                // lets reset the session
+                getInternalSession().setMessageListener(null);
+
+                // Close any consumers and browsers that may have been created.
+                for (Iterator<MessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
+                    MessageConsumer consumer = iter.next();
+                    consumer.close();
+                }
 
-            // Close any consumers and browsers that may have been created.
-            for (Iterator<MessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
-                MessageConsumer consumer = iter.next();
-                consumer.close();
-            }
-            consumers.clear();
+                for (Iterator<QueueBrowser> iter = browsers.iterator(); iter.hasNext();) {
+                    QueueBrowser browser = iter.next();
+                    browser.close();
+                }
 
-            for (Iterator<QueueBrowser> iter = browsers.iterator(); iter.hasNext();) {
-                QueueBrowser browser = iter.next();
-                browser.close();
+                if (transactional && !isXa) {
+                    try {
+                        getInternalSession().rollback();
+                    } catch (JMSException e) {
+                        invalidate = true;
+                        LOG.warn("Caught exception trying rollback() when putting session back into the pool, will invalidate. " + e, e);
+                    }
+                }
+            } catch (JMSException ex) {
+                invalidate = true;
+                LOG.warn("Caught exception trying close() when putting session back into the pool, will invalidate. " + ex, ex);
+            } finally {
+                consumers.clear();
+                browsers.clear();
             }
-            browsers.clear();
-
-            if (transactional && !isXa) {
-                try {
-                    getInternalSession().rollback();
-                } catch (JMSException e) {
-                    LOG.warn("Caught exception trying rollback() when putting session back into the pool: " + e, e);
-
-                    // lets close the session and not put the session back into
-                    // the pool
+            if (invalidate) {
+                // lets close the session and not put the session back into
+                // the pool
+                if (session != null) {
                     try {
                         session.close();
                     } catch (JMSException e1) {
-                        LOG.trace("Ignoring exception as discarding session: " + e1, e1);
+                        LOG.trace("Ignoring exception on close as discarding session: " + e1, e1);
                     }
                     session = null;
-                    sessionPool.invalidateSession(this);
-                    return;
                 }
+                sessionPool.invalidateSession(this);
+            } else {
+                sessionPool.returnSession(this);
             }
-
-            sessionPool.returnSession(this);
         }
     }
 

Modified: activemq/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/pom.xml?rev=1100208&r1=1100207&r2=1100208&view=diff
==============================================================================
--- activemq/trunk/pom.xml (original)
+++ activemq/trunk/pom.xml Fri May  6 13:27:49 2011
@@ -899,7 +899,7 @@
       <dependency>
         <groupId>org.apache.geronimo.components</groupId>
         <artifactId>geronimo-transaction</artifactId>
-        <version>2.1</version>
+        <version>2.2.1</version>
       </dependency>
       
       <!-- FTP support for BlobMessages -->