You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2012/06/01 16:32:51 UTC

svn commit: r1345202 [2/2] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/command/ main/java/org/apache/activemq/store/ main/java/org/apache/activ...

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java?rev=1345202&r1=1345201&r2=1345202&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java Fri Jun  1 14:32:50 2012
@@ -30,6 +30,7 @@ import org.apache.activemq.store.ProxyTo
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TransactionRecoveryListener;
 import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.jdbc.JDBCMessageStore;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -45,16 +46,16 @@ import java.util.concurrent.Future;
  */
 public class MemoryTransactionStore implements TransactionStore {
 
-    ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
-    ConcurrentHashMap<TransactionId, Tx> preparedTransactions = new ConcurrentHashMap<TransactionId, Tx>();
-    final PersistenceAdapter persistenceAdapter;
+    protected ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
+    protected ConcurrentHashMap<TransactionId, Tx> preparedTransactions = new ConcurrentHashMap<TransactionId, Tx>();
+    protected final PersistenceAdapter persistenceAdapter;
 
     private boolean doingRecover;
 
     public class Tx {
-        private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
+        public ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
 
-        private final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
+        public final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
 
         public void add(AddMessageCommand msg) {
             messages.add(msg);
@@ -114,6 +115,8 @@ public class MemoryTransactionStore impl
     public interface AddMessageCommand {
         Message getMessage();
 
+        MessageStore getMessageStore();
+
         void run(ConnectionContext context) throws IOException;
     }
 
@@ -121,6 +124,8 @@ public class MemoryTransactionStore impl
         MessageAck getMessageAck();
 
         void run(ConnectionContext context) throws IOException;
+
+        MessageStore getMessageStore();
     }
 
     public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) {
@@ -164,7 +169,7 @@ public class MemoryTransactionStore impl
     }
 
     public TopicMessageStore proxy(TopicMessageStore messageStore) {
-        return new ProxyTopicMessageStore(messageStore) {
+        ProxyTopicMessageStore proxyTopicMessageStore = new ProxyTopicMessageStore(messageStore) {
             @Override
             public void addMessage(ConnectionContext context, final Message send) throws IOException {
                 MemoryTransactionStore.this.addMessage(getDelegate(), send);
@@ -204,12 +209,17 @@ public class MemoryTransactionStore impl
                         subscriptionName, messageId, ack);
             }
         };
+        onProxyTopicStore(proxyTopicMessageStore);
+        return proxyTopicMessageStore;
+    }
+
+    protected void onProxyTopicStore(ProxyTopicMessageStore proxyTopicMessageStore) {
     }
 
     /**
      * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
      */
-    public void prepare(TransactionId txid) {
+    public void prepare(TransactionId txid) throws IOException {
         Tx tx = inflightTransactions.remove(txid);
         if (tx == null) {
             return;
@@ -226,6 +236,15 @@ public class MemoryTransactionStore impl
         return tx;
     }
 
+    public Tx getPreparedTx(TransactionId txid) {
+        Tx tx = preparedTransactions.get(txid);
+        if (tx == null) {
+            tx = new Tx();
+            preparedTransactions.put(txid, tx);
+        }
+        return tx;
+    }
+
     public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
         if (preCommit != null) {
             preCommit.run();
@@ -248,7 +267,7 @@ public class MemoryTransactionStore impl
     /**
      * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
      */
-    public void rollback(TransactionId txid) {
+    public void rollback(TransactionId txid) throws IOException {
         preparedTransactions.remove(txid);
         inflightTransactions.remove(txid);
     }
@@ -268,12 +287,16 @@ public class MemoryTransactionStore impl
                 Object txid = iter.next();
                 Tx tx = preparedTransactions.get(txid);
                 listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks());
+                onRecovered(tx);
             }
         } finally {
             this.doingRecover = false;
         }
     }
 
+    protected void onRecovered(Tx tx) {
+    }
+
     /**
      * @param message
      * @throws IOException
@@ -291,6 +314,11 @@ public class MemoryTransactionStore impl
                     return message;
                 }
 
+                @Override
+                public MessageStore getMessageStore() {
+                    return destination;
+                }
+
                 public void run(ConnectionContext ctx) throws IOException {
                     destination.addMessage(ctx, message);
                 }
@@ -320,13 +348,18 @@ public class MemoryTransactionStore impl
                 public void run(ConnectionContext ctx) throws IOException {
                     destination.removeMessage(ctx, ack);
                 }
+
+                @Override
+                public MessageStore getMessageStore() {
+                    return destination;
+                }
             });
         } else {
             destination.removeMessage(null, ack);
         }
     }
 
-    final void acknowledge(final TopicMessageStore destination, final String clientId, final String subscriptionName,
+    public void acknowledge(final TopicMessageStore destination, final String clientId, final String subscriptionName,
                            final MessageId messageId, final MessageAck ack) throws IOException {
         if (doingRecover) {
             return;
@@ -342,6 +375,11 @@ public class MemoryTransactionStore impl
                 public void run(ConnectionContext ctx) throws IOException {
                     destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
                 }
+
+                @Override
+                public MessageStore getMessageStore() {
+                    return destination;
+                }
             });
         } else {
             destination.acknowledge(null, clientId, subscriptionName, messageId, ack);

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java?rev=1345202&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java Fri Jun  1 14:32:50 2012
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import junit.framework.Test;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+import org.apache.derby.jdbc.EmbeddedXADataSource;
+
+public class JdbcXARecoveryBrokerTest extends XARecoveryBrokerTest {
+
+    EmbeddedXADataSource dataSource;
+
+    @Override
+    protected void setUp() throws Exception {
+        dataSource = new EmbeddedXADataSource();
+        dataSource.setDatabaseName("derbyDb");
+        dataSource.setCreateDatabase("create");
+        super.setUp();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        stopDerby();
+    }
+
+    @Override
+    protected void configureBroker(BrokerService broker) throws Exception {
+        super.configureBroker(broker);
+
+        JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
+        jdbc.setDataSource(dataSource);
+        broker.setPersistenceAdapter(jdbc);
+    }
+
+    @Override
+    protected void restartBroker() throws Exception {
+        broker.stop();
+        stopDerby();
+        dataSource = new EmbeddedXADataSource();
+        dataSource.setDatabaseName("derbyDb");
+        dataSource.setCreateDatabase("create");
+
+        broker = createRestartedBroker();
+        broker.start();
+    }
+
+    private void stopDerby() {
+        LOG.info("STOPPING DB!@!!!!");
+        final EmbeddedDataSource ds = dataSource;
+        try {
+            ds.setShutdownDatabase("shutdown");
+            ds.getConnection();
+        } catch (Exception ignored) {
+        }
+
+    }
+
+    public static Test suite() {
+        return suite(JdbcXARecoveryBrokerTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+    @Override
+    protected ActiveMQDestination createDestination() {
+        return new ActiveMQQueue("test,special");
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java?rev=1345202&r1=1345201&r2=1345202&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java Fri Jun  1 14:32:50 2012
@@ -23,9 +23,13 @@ import javax.management.MalformedObjectN
 import javax.management.ObjectName;
 import junit.framework.Test;
 
+import org.apache.activemq.broker.jmx.DestinationView;
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
 import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.DataArrayResponse;
@@ -37,6 +41,7 @@ import org.apache.activemq.command.Sessi
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.TransactionInfo;
 import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
 import org.apache.activemq.util.JMXSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,6 +53,8 @@ import org.slf4j.LoggerFactory;
  */
 public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
     protected static final Logger LOG = LoggerFactory.getLogger(XARecoveryBrokerTest.class);
+    public boolean prioritySupport = false;
+
     public void testPreparedJmxView() throws Exception {
 
         ActiveMQDestination destination = createDestination();
@@ -96,6 +103,10 @@ public class XARecoveryBrokerTest extend
         dar = (DataArrayResponse)response;
         assertEquals(4, dar.getData().length);
 
+        // validate destination depth via jmx
+        DestinationViewMBean destinationView = getProxyToDestination(destinationList(destination)[0]);
+        assertEquals("enqueue count does not see prepared", 0, destinationView.getQueueSize());
+
         TransactionId first = (TransactionId)dar.getData()[0];
         // via jmx, force outcome
         for (int i = 0; i < 4; i++) {
@@ -131,6 +142,16 @@ public class XARecoveryBrokerTest extend
         return proxy;
     }
 
+    private DestinationViewMBean getProxyToDestination(ActiveMQDestination destination) throws MalformedObjectNameException, JMSException {
+
+        ObjectName objectName = new ObjectName("org.apache.activemq:Type=" + (destination.isQueue() ? "Queue" : "Topic") + ",Destination=" +
+                JMXSupport.encodeObjectNamePart(destination.getPhysicalName()) + ",BrokerName=localhost");
+        DestinationViewMBean proxy = (DestinationViewMBean) broker.getManagementContext().newProxyInstance(objectName,
+                DestinationViewMBean.class, true);
+        return proxy;
+
+    }
+
     public void testPreparedTransactionRecoveredOnRestart() throws Exception {
 
         ActiveMQDestination destination = createDestination();
@@ -213,6 +234,94 @@ public class XARecoveryBrokerTest extend
         assertNoMessagesLeft(connection);
     }
 
+    public void testTopicPreparedTransactionRecoveredOnRestart() throws Exception {
+        ActiveMQDestination destination = new ActiveMQTopic("TryTopic");
+
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        connectionInfo.setClientId("durable");
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+        consumerInfo.setSubscriptionName("durable");
+        connection.send(consumerInfo);
+
+        // Prepare 4 message sends.
+        for (int i = 0; i < 4; i++) {
+            // Begin the transaction.
+            XATransactionId txid = createXATransaction(sessionInfo);
+            connection.send(createBeginTransaction(connectionInfo, txid));
+
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            message.setTransactionId(txid);
+            connection.send(message);
+
+            // Prepare
+            connection.send(createPrepareTransaction(connectionInfo, txid));
+        }
+
+        // Since prepared but not committed.. they should not get delivered.
+        assertNull(receiveMessage(connection));
+        assertNoMessagesLeft(connection);
+        connection.request(closeConnectionInfo(connectionInfo));
+
+        // restart the broker.
+        restartBroker();
+
+        // Setup the consumer and try receive the message.
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        connectionInfo.setClientId("durable");
+
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        consumerInfo = createConsumerInfo(sessionInfo, destination);
+        consumerInfo.setSubscriptionName("durable");
+        connection.send(consumerInfo);
+
+        // Since prepared but not committed.. they should not get delivered.
+        assertNull(receiveMessage(connection));
+        assertNoMessagesLeft(connection);
+
+        Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
+        assertNotNull(response);
+        DataArrayResponse dar = (DataArrayResponse) response;
+        assertEquals(4, dar.getData().length);
+
+        // ensure we can close a connection with prepared transactions
+        connection.request(closeConnectionInfo(connectionInfo));
+
+        // open again  to deliver outcome
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        connectionInfo.setClientId("durable");
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        consumerInfo = createConsumerInfo(sessionInfo, destination);
+        consumerInfo.setSubscriptionName("durable");
+        connection.send(consumerInfo);
+
+        // Commit the prepared transactions.
+        for (int i = 0; i < dar.getData().length; i++) {
+            connection.send(createCommitTransaction2Phase(connectionInfo, (TransactionId) dar.getData()[i]));
+        }
+
+        // We should get the committed transactions.
+        for (int i = 0; i < expectedMessageCount(4, destination); i++) {
+            Message m = receiveMessage(connection, TimeUnit.SECONDS.toMillis(10));
+            assertNotNull(m);
+        }
+
+        assertNoMessagesLeft(connection);
+
+    }
+
     public void testQueuePersistentCommitedMessagesNotLostOnRestart() throws Exception {
 
         ActiveMQDestination destination = createDestination();
@@ -260,6 +369,55 @@ public class XARecoveryBrokerTest extend
         assertNoMessagesLeft(connection);
     }
 
+    public void testQueuePersistentCommited2PhaseMessagesNotLostOnRestart() throws Exception {
+
+        ActiveMQDestination destination = createDestination();
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+        // Begin the transaction.
+        XATransactionId txid = createXATransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+
+        for (int i = 0; i < 4; i++) {
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            message.setTransactionId(txid);
+            connection.send(message);
+        }
+
+        // Commit 2 phase
+        connection.request(createPrepareTransaction(connectionInfo, txid));
+        connection.send(createCommitTransaction2Phase(connectionInfo, txid));
+
+        connection.request(closeConnectionInfo(connectionInfo));
+        // restart the broker.
+        restartBroker();
+
+        // Setup the consumer and receive the message.
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+
+        for (int i = 0; i < expectedMessageCount(4, destination); i++) {
+            Message m = receiveMessage(connection);
+            assertNotNull(m);
+        }
+
+        assertNoMessagesLeft(connection);
+    }
+
     public void testQueuePersistentCommitedAcksNotLostOnRestart() throws Exception {
 
         ActiveMQDestination destination = createDestination();
@@ -396,6 +554,90 @@ public class XARecoveryBrokerTest extend
         assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length);
     }
 
+    public void initCombosForTestTopicPersistentPreparedAcksNotLostOnRestart() {
+        addCombinationValues("prioritySupport", new Boolean[]{Boolean.FALSE, Boolean.TRUE});
+    }
+
+    public void testTopicPersistentPreparedAcksNotLostOnRestart() throws Exception {
+        // REVISIT for kahadb
+        if (! (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter)) {
+            LOG.warn("only works on jdbc");
+            return;
+        }
+        ActiveMQDestination destination = new ActiveMQTopic("TryTopic");
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        connectionInfo.setClientId("durable");
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+        // setup durable subs
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+        consumerInfo.setSubscriptionName("durable");
+        connection.send(consumerInfo);
+
+        for (int i = 0; i < 4; i++) {
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            connection.send(message);
+        }
+
+        // Begin the transaction.
+        XATransactionId txid = createXATransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+
+        final int messageCount = expectedMessageCount(4, destination);
+        Message m = null;
+        for (int i = 0; i < messageCount; i++) {
+            m = receiveMessage(connection);
+            assertNotNull("unexpected null on: " + i, m);
+        }
+
+        // one ack with last received, mimic a beforeEnd synchronization
+        MessageAck ack = createAck(consumerInfo, m, messageCount, MessageAck.STANDARD_ACK_TYPE);
+        ack.setTransactionId(txid);
+        connection.send(ack);
+
+        connection.request(createPrepareTransaction(connectionInfo, txid));
+
+        // restart the broker.
+        restartBroker();
+
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        connectionInfo.setClientId("durable");
+        connection.send(connectionInfo);
+
+        // validate recovery
+        TransactionInfo recoverInfo = new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER);
+        DataArrayResponse dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo);
+
+        assertEquals("there is a prepared tx", 1, dataArrayResponse.getData().length);
+        assertEquals("it matches", txid, dataArrayResponse.getData()[0]);
+
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(sessionInfo);
+        consumerInfo = createConsumerInfo(sessionInfo, destination);
+        consumerInfo.setSubscriptionName("durable");
+        connection.send(consumerInfo);
+
+        // no redelivery, exactly once semantics unless there is rollback
+        m = receiveMessage(connection);
+        assertNull(m);
+        assertNoMessagesLeft(connection);
+
+        connection.request(createCommitTransaction2Phase(connectionInfo, txid));
+
+        // validate recovery complete
+        dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo);
+        assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length);
+    }
+
     public void testQueuePersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception {
 
         ActiveMQDestination destination = createDestination();
@@ -409,7 +651,8 @@ public class XARecoveryBrokerTest extend
         connection.send(sessionInfo);
         connection.send(producerInfo);
 
-        for (int i = 0; i < 4; i++) {
+        int numMessages = 4;
+        for (int i = 0; i < numMessages; i++) {
             Message message = createMessage(producerInfo, destination);
             message.setPersistent(true);
             connection.send(message);
@@ -426,13 +669,13 @@ public class XARecoveryBrokerTest extend
             consumerInfo = createConsumerInfo(sessionInfo, dest);
             connection.send(consumerInfo);
 
-            for (int i = 0; i < 4; i++) {
+            for (int i = 0; i < numMessages; i++) {
                 message = receiveMessage(connection);
                 assertNotNull(message);
             }
 
             // one ack with last received, mimic a beforeEnd synchronization
-            MessageAck ack = createAck(consumerInfo, message, 4, MessageAck.STANDARD_ACK_TYPE);
+            MessageAck ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE);
             ack.setTransactionId(txid);
             connection.send(ack);
         }
@@ -466,7 +709,7 @@ public class XARecoveryBrokerTest extend
         // rollback so we get redelivery
         connection.request(createRollbackTransaction(connectionInfo, txid));
 
-        // Begin new transaction for redelivery
+        LOG.info("new tx for redelivery");
         txid = createXATransaction(sessionInfo);
         connection.send(createBeginTransaction(connectionInfo, txid));
 
@@ -475,11 +718,11 @@ public class XARecoveryBrokerTest extend
             consumerInfo = createConsumerInfo(sessionInfo, dest);
             connection.send(consumerInfo);
 
-            for (int i = 0; i < 4; i++) {
+            for (int i = 0; i < numMessages; i++) {
                 message = receiveMessage(connection);
-                assertNotNull(message);
+                assertNotNull("unexpected null on:" + i, message);
             }
-            MessageAck ack = createAck(consumerInfo, message, 4, MessageAck.STANDARD_ACK_TYPE);
+            MessageAck ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE);
             ack.setTransactionId(txid);
             connection.send(ack);
         }
@@ -492,6 +735,180 @@ public class XARecoveryBrokerTest extend
         assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length);
     }
 
+    public void initCombosForTestTopicPersistentPreparedAcksAvailableAfterRestartAndRollback() {
+        addCombinationValues("prioritySupport", new Boolean[]{Boolean.FALSE, Boolean.TRUE});
+    }
+
+    public void testTopicPersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception {
+
+        // REVISIT for kahadb
+        if (! (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter)) {
+            LOG.warn("only works on jdbc");
+            return;
+        }
+
+        ActiveMQDestination destination = new ActiveMQTopic("TryTopic");
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        connectionInfo.setClientId("durable");
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+        // setup durable subs
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+        consumerInfo.setSubscriptionName("durable");
+        connection.send(consumerInfo);
+
+        int numMessages = 4;
+        for (int i = 0; i < numMessages; i++) {
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            connection.send(message);
+        }
+
+        // Begin the transaction.
+        XATransactionId txid = createXATransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+
+        Message message = null;
+        for (int i = 0; i < numMessages; i++) {
+            message = receiveMessage(connection);
+            assertNotNull(message);
+        }
+
+        // one ack with last received, mimic a beforeEnd synchronization
+        MessageAck ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE);
+        ack.setTransactionId(txid);
+        connection.send(ack);
+
+        connection.request(createPrepareTransaction(connectionInfo, txid));
+
+        // restart the broker.
+        restartBroker();
+
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        connectionInfo.setClientId("durable");
+        connection.send(connectionInfo);
+
+        // validate recovery
+        TransactionInfo recoverInfo = new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER);
+        DataArrayResponse dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo);
+
+        assertEquals("there is a prepared tx", 1, dataArrayResponse.getData().length);
+        assertEquals("it matches", txid, dataArrayResponse.getData()[0]);
+
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(sessionInfo);
+        consumerInfo = createConsumerInfo(sessionInfo, destination);
+        consumerInfo.setSubscriptionName("durable");
+        connection.send(consumerInfo);
+
+        // no redelivery, exactly once semantics while prepared
+        message = receiveMessage(connection);
+        assertNull(message);
+        assertNoMessagesLeft(connection);
+
+        // rollback so we get redelivery
+        connection.request(createRollbackTransaction(connectionInfo, txid));
+
+        LOG.info("new tx for redelivery");
+        txid = createXATransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+
+        for (int i = 0; i < numMessages; i++) {
+            message = receiveMessage(connection);
+            assertNotNull("unexpected null on:" + i, message);
+        }
+        ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE);
+        ack.setTransactionId(txid);
+        connection.send(ack);
+
+        // Commit
+        connection.request(createCommitTransaction1Phase(connectionInfo, txid));
+
+        // validate recovery complete
+        dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo);
+        assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length);
+    }
+
+    public void initCombosForTestTopicPersistentPreparedAcksAvailableAfterRollback() {
+        addCombinationValues("prioritySupport", new Boolean[]{Boolean.FALSE, Boolean.TRUE});
+    }
+
+    public void testTopicPersistentPreparedAcksAvailableAfterRollback() throws Exception {
+
+        // REVISIT for kahadb
+        if (! (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter)) {
+            LOG.warn("only works on jdbc");
+            return;
+        }
+
+        ActiveMQDestination destination = new ActiveMQTopic("TryTopic");
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        connectionInfo.setClientId("durable");
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+        // setup durable subs
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+        consumerInfo.setSubscriptionName("durable");
+        connection.send(consumerInfo);
+
+        int numMessages = 4;
+        for (int i = 0; i < numMessages; i++) {
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            connection.send(message);
+        }
+
+        // Begin the transaction.
+        XATransactionId txid = createXATransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+
+        Message message = null;
+        for (int i = 0; i < numMessages; i++) {
+            message = receiveMessage(connection);
+            assertNotNull(message);
+        }
+
+        // one ack with last received, mimic a beforeEnd synchronization
+        MessageAck ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE);
+        ack.setTransactionId(txid);
+        connection.send(ack);
+
+        connection.request(createPrepareTransaction(connectionInfo, txid));
+
+        // rollback so we get redelivery
+        connection.request(createRollbackTransaction(connectionInfo, txid));
+
+        LOG.info("new tx for redelivery");
+        txid = createXATransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+
+        for (int i = 0; i < numMessages; i++) {
+            message = receiveMessage(connection);
+            assertNotNull("unexpected null on:" + i, message);
+        }
+        ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE);
+        ack.setTransactionId(txid);
+        connection.send(ack);
+
+        // Commit
+        connection.request(createCommitTransaction1Phase(connectionInfo, txid));
+    }
+
     private ActiveMQDestination[] destinationList(ActiveMQDestination dest) {
         return dest.isComposite() ? dest.getCompositeDestinations() : new ActiveMQDestination[]{dest};
     }
@@ -564,6 +981,13 @@ public class XARecoveryBrokerTest extend
         assertNoMessagesLeft(connection);
     }
 
+    @Override
+    protected PolicyEntry getDefaultPolicy() {
+        PolicyEntry policyEntry = super.getDefaultPolicy();
+        policyEntry.setPrioritizedMessages(prioritySupport);
+        return policyEntry;
+    }
+
     public static Test suite() {
         return suite(XARecoveryBrokerTest.class);
     }