You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2018/04/26 13:59:23 UTC

activemq git commit: AMQ-6707 - JDBC XA recovery and completion.

Repository: activemq
Updated Branches:
  refs/heads/master 1d2226e6c -> ea70e827c


AMQ-6707 - JDBC XA recovery and completion.

ensure pending transactions are visible for recovery without restart
sync store and cursor size during and after completion
ensure pending messages are not visible to browsers
retain transaction state on jdbc error
a bunch of new tests around xa completion


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ea70e827
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ea70e827
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ea70e827

Branch: refs/heads/master
Commit: ea70e827c049cee9d455303a8cdb31a64b23763d
Parents: 1d2226e
Author: gtully <ga...@gmail.com>
Authored: Thu Apr 26 14:53:51 2018 +0100
Committer: gtully <ga...@gmail.com>
Committed: Thu Apr 26 14:58:48 2018 +0100

----------------------------------------------------------------------
 .../activemq/broker/TransactionBroker.java      |  30 +-
 .../activemq/broker/region/Destination.java     |   2 +-
 .../broker/region/DestinationFilter.java        |   4 +-
 .../broker/region/PrefetchSubscription.java     |   9 +-
 .../apache/activemq/broker/region/Queue.java    |  15 +-
 .../apache/activemq/broker/region/Topic.java    |   3 +-
 .../region/cursors/AbstractStoreCursor.java     |   2 -
 .../activemq/store/ProxyTopicMessageStore.java  |  28 +-
 .../store/memory/MemoryTransactionStore.java    |   8 +-
 .../activemq/transaction/XATransaction.java     |  20 +-
 .../util/DefaultIOExceptionHandler.java         |   2 +-
 .../store/jdbc/JDBCPersistenceAdapter.java      |  29 +-
 .../store/jdbc/JDBCTopicMessageStore.java       |   4 +-
 .../store/jdbc/JdbcMemoryTransactionStore.java  |  49 +-
 .../activemq/store/jdbc/TransactionContext.java |   1 +
 .../activemq/store/kahadb/KahaDBStore.java      |  85 ++
 .../activemq/store/kahadb/MessageDatabase.java  |  33 -
 .../broker/JdbcXARecoveryBrokerTest.java        |   1 +
 .../activemq/broker/XARecoveryBrokerTest.java   |   6 +-
 .../store/jdbc/JDBCCommitExceptionTest.java     |   1 +
 .../store/jdbc/JDBCXACommitExceptionTest.java   |  47 +-
 .../activemq/store/jdbc/XACompletionTest.java   | 857 +++++++++++++++++++
 22 files changed, 1106 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/ea70e827/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java
index 33426de..7a745b5 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java
@@ -27,7 +27,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import javax.jms.JMSException;
 import javax.transaction.xa.XAException;
 
-import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.broker.jmx.ManagedRegionBroker;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -64,6 +63,7 @@ public class TransactionBroker extends BrokerFilter {
     // The prepared XA transactions.
     private TransactionStore transactionStore;
     private Map<TransactionId, XATransaction> xaTransactions = new LinkedHashMap<TransactionId, XATransaction>();
+    final ConnectionContext context = new ConnectionContext();
 
     public TransactionBroker(Broker next, TransactionStore transactionStore) {
         super(next);
@@ -82,7 +82,6 @@ public class TransactionBroker extends BrokerFilter {
     public void start() throws Exception {
         transactionStore.start();
         try {
-            final ConnectionContext context = new ConnectionContext();
             context.setBroker(this);
             context.setInRecoveryMode(true);
             context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
@@ -128,12 +127,11 @@ public class TransactionBroker extends BrokerFilter {
 
     private void forceDestinationWakeupOnCompletion(ConnectionContext context, Transaction transaction,
                                                     ActiveMQDestination amqDestination, BaseCommand ack) throws Exception {
-        Destination destination =  addDestination(context, amqDestination, false);
-        registerSync(destination, transaction, ack);
+        registerSync(amqDestination, transaction, ack);
     }
 
-    private void registerSync(Destination destination, Transaction transaction, BaseCommand command) {
-        Synchronization sync = new PreparedDestinationCompletion(destination, command.isMessage());
+    private void registerSync(ActiveMQDestination destination, Transaction transaction, BaseCommand command) {
+        Synchronization sync = new PreparedDestinationCompletion(this, destination, command.isMessage());
         // ensure one per destination in the list
         Synchronization existing = transaction.findMatching(sync);
         if (existing != null) {
@@ -144,10 +142,12 @@ public class TransactionBroker extends BrokerFilter {
     }
 
     static class PreparedDestinationCompletion extends Synchronization {
-        final Destination destination;
+        private final TransactionBroker transactionBroker;
+        final ActiveMQDestination destination;
         final boolean messageSend;
         int opCount = 1;
-        public PreparedDestinationCompletion(final Destination destination, boolean messageSend) {
+        public PreparedDestinationCompletion(final TransactionBroker transactionBroker, ActiveMQDestination destination, boolean messageSend) {
+            this.transactionBroker = transactionBroker;
             this.destination = destination;
             // rollback relevant to acks, commit to sends
             this.messageSend = messageSend;
@@ -173,21 +173,23 @@ public class TransactionBroker extends BrokerFilter {
         @Override
         public void afterRollback() throws Exception {
             if (!messageSend) {
-                destination.clearPendingMessages();
+                Destination dest = transactionBroker.addDestination(transactionBroker.context, destination, false);
+                dest.clearPendingMessages(opCount);
+                dest.getDestinationStatistics().getMessages().add(opCount);
                 LOG.debug("cleared pending from afterRollback: {}", destination);
             }
         }
 
         @Override
         public void afterCommit() throws Exception {
+            Destination dest = transactionBroker.addDestination(transactionBroker.context, destination, false);
             if (messageSend) {
-                destination.clearPendingMessages();
-                destination.getDestinationStatistics().getEnqueues().add(opCount);
-                destination.getDestinationStatistics().getMessages().add(opCount);
+                dest.clearPendingMessages(opCount);
+                dest.getDestinationStatistics().getEnqueues().add(opCount);
+                dest.getDestinationStatistics().getMessages().add(opCount);
                 LOG.debug("cleared pending from afterCommit: {}", destination);
             } else {
-                destination.getDestinationStatistics().getDequeues().add(opCount);
-                destination.getDestinationStatistics().getMessages().subtract(opCount);
+                dest.getDestinationStatistics().getDequeues().add(opCount);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ea70e827/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
index 7413a14..81e7fa1 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
@@ -239,7 +239,7 @@ public interface Destination extends Service, Task, Message.MessageDestination {
     boolean isDoOptimzeMessageStorage();
     void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage);
 
-    public void clearPendingMessages();
+    public void clearPendingMessages(int pendingAdditionsCount);
 
     void duplicateFromStore(Message message, Subscription subscription);
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ea70e827/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
index 7f25376..dad8501 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
@@ -379,8 +379,8 @@ public class DestinationFilter implements Destination {
     }
 
     @Override
-    public void clearPendingMessages() {
-        next.clearPendingMessages();
+    public void clearPendingMessages(int pendingAdditionsCount) {
+        next.clearPendingMessages(0);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/ea70e827/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index fc68fc1..8f1ac0a 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -413,9 +413,12 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
                         Destination nodeDest = (Destination) node.getRegionDestination();
                         synchronized (dispatchLock) {
                             getSubscriptionStatistics().getDequeues().increment();
-                            dispatched.remove(node);
-                            getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
-                            nodeDest.getDestinationStatistics().getInflight().decrement();
+                            if (dispatched.remove(node)) {
+                                // if consumer is removed, dispatched will be empty and inflight will
+                                // already have been adjusted
+                                getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
+                                nodeDest.getDestinationStatistics().getInflight().decrement();
+                            }
                         }
                         contractPrefetchExtension(1);
                         nodeDest.wakeup();

http://git-wip-us.apache.org/repos/asf/activemq/blob/ea70e827/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index ff55e2f..4c84713 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -68,6 +68,7 @@ import org.apache.activemq.broker.region.policy.DispatchPolicy;
 import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
 import org.apache.activemq.broker.util.InsertionCountList;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ExceptionResponse;
 import org.apache.activemq.command.Message;
@@ -1248,7 +1249,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
             QueueMessageReference ref = (QueueMessageReference) i.next();
             if (ref.isExpired() && (ref.getLockOwner() == null)) {
                 toExpire.add(ref);
-            } else if (l.contains(ref.getMessage()) == false) {
+            } else if (!ref.isAcked() && l.contains(ref.getMessage()) == false) {
                 l.add(ref.getMessage());
             }
         }
@@ -1326,9 +1327,19 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
     }
 
     @Override
-    public void clearPendingMessages() {
+    public void clearPendingMessages(int pendingAdditionsCount) {
         messagesLock.writeLock().lock();
         try {
+            final ActiveMQMessage dummyPersistent = new ActiveMQMessage();
+            dummyPersistent.setPersistent(true);
+            for (int i=0; i<pendingAdditionsCount; i++) {
+                try {
+                    // track the increase in the cursor size w/o reverting to the store
+                    messages.addMessageFirst(dummyPersistent);
+                } catch (Exception ignored) {
+                    LOG.debug("Unexpected exception on tracking pending message additions", ignored);
+                }
+            }
             if (resetNeeded) {
                 messages.gc();
                 messages.reset();

http://git-wip-us.apache.org/repos/asf/activemq/blob/ea70e827/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index ff0406e..3ed4aaf 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -850,9 +850,10 @@ public class Topic extends BaseDestination implements Task {
 
     /**
      * force a reread of the store - after transaction recovery completion
+     * @param pendingAdditionsCount
      */
     @Override
-    public void clearPendingMessages() {
+    public void clearPendingMessages(int pendingAdditionsCount) {
         dispatchLock.readLock().lock();
         try {
             for (DurableTopicSubscription durableTopicSubscription : durableSubscribers.values()) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/ea70e827/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
index 44b0b27..b5c560b 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
@@ -79,7 +79,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
 
     @Override
     public void rebase() {
-        resetSize();
         MessageId lastAdded = lastCachedIds[SYNC_ADD];
         if (lastAdded != null) {
             try {
@@ -397,7 +396,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
 
     @Override
     public synchronized void addMessageFirst(MessageReference node) throws Exception {
-        setCacheEnabled(false);
         size++;
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/ea70e827/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
index b9b79c9..09b6529 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
@@ -29,12 +29,10 @@ import org.apache.activemq.usage.MemoryUsage;
 /**
  * A simple proxy that delegates to another MessageStore.
  */
-public class ProxyTopicMessageStore implements TopicMessageStore {
-
-    final TopicMessageStore delegate;
+public class ProxyTopicMessageStore extends ProxyMessageStore implements TopicMessageStore  {
 
     public ProxyTopicMessageStore(TopicMessageStore delegate) {
-        this.delegate = delegate;
+        super(delegate);
     }
 
     public MessageStore getDelegate() {
@@ -83,40 +81,40 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
 
     @Override
     public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
-        return delegate.lookupSubscription(clientId, subscriptionName);
+        return ((TopicMessageStore)delegate).lookupSubscription(clientId, subscriptionName);
     }
 
     @Override
     public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
                             MessageId messageId, MessageAck ack) throws IOException {
-        delegate.acknowledge(context, clientId, subscriptionName, messageId, ack);
+        ((TopicMessageStore)delegate).acknowledge(context, clientId, subscriptionName, messageId, ack);
     }
 
     @Override
     public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
-        delegate.addSubscription(subscriptionInfo, retroactive);
+        ((TopicMessageStore)delegate).addSubscription(subscriptionInfo, retroactive);
     }
 
     @Override
     public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
-        delegate.deleteSubscription(clientId, subscriptionName);
+        ((TopicMessageStore)delegate).deleteSubscription(clientId, subscriptionName);
     }
 
     @Override
     public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener)
         throws Exception {
-        delegate.recoverSubscription(clientId, subscriptionName, listener);
+        ((TopicMessageStore)delegate).recoverSubscription(clientId, subscriptionName, listener);
     }
 
     @Override
     public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned,
                                     MessageRecoveryListener listener) throws Exception {
-        delegate.recoverNextMessages(clientId, subscriptionName, maxReturned, listener);
+        ((TopicMessageStore)delegate).recoverNextMessages(clientId, subscriptionName, maxReturned, listener);
     }
 
     @Override
     public void resetBatching(String clientId, String subscriptionName) {
-        delegate.resetBatching(clientId, subscriptionName);
+        ((TopicMessageStore)delegate).resetBatching(clientId, subscriptionName);
     }
 
     @Override
@@ -126,7 +124,7 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
 
     @Override
     public SubscriptionInfo[] getAllSubscriptions() throws IOException {
-        return delegate.getAllSubscriptions();
+        return ((TopicMessageStore)delegate).getAllSubscriptions();
     }
 
     @Override
@@ -136,7 +134,7 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
 
     @Override
     public int getMessageCount(String clientId, String subscriberName) throws IOException {
-        return delegate.getMessageCount(clientId, subscriberName);
+        return ((TopicMessageStore)delegate).getMessageCount(clientId, subscriberName);
     }
 
     @Override
@@ -230,11 +228,11 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
     @Override
     public long getMessageSize(String clientId, String subscriberName)
             throws IOException {
-        return delegate.getMessageSize(clientId, subscriberName);
+        return ((TopicMessageStore)delegate).getMessageSize(clientId, subscriberName);
     }
 
     @Override
     public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() {
-        return delegate.getMessageStoreSubStatistics();
+        return ((TopicMessageStore)delegate).getMessageStoreSubStatistics();
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ea70e827/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
index 5d1cb20..abf8282 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
@@ -107,11 +107,12 @@ public class MemoryTransactionStore implements TransactionStore {
                     cmd.run(ctx);
                 }
 
+                persistenceAdapter.commitTransaction(ctx);
+
             } catch (IOException e) {
                 persistenceAdapter.rollbackTransaction(ctx);
                 throw e;
             }
-            persistenceAdapter.commitTransaction(ctx);
         }
     }
 
@@ -267,7 +268,7 @@ public class MemoryTransactionStore implements TransactionStore {
         }
         Tx tx;
         if (wasPrepared) {
-            tx = preparedTransactions.remove(txid);
+            tx = preparedTransactions.get(txid);
         } else {
             tx = inflightTransactions.remove(txid);
         }
@@ -275,6 +276,9 @@ public class MemoryTransactionStore implements TransactionStore {
         if (tx != null) {
             tx.commit();
         }
+        if (wasPrepared) {
+            preparedTransactions.remove(txid);
+        }
         if (postCommit != null) {
             postCommit.run();
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ea70e827/activemq-broker/src/main/java/org/apache/activemq/transaction/XATransaction.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transaction/XATransaction.java b/activemq-broker/src/main/java/org/apache/activemq/transaction/XATransaction.java
index 9e456f9..dca8c8e 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/transaction/XATransaction.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transaction/XATransaction.java
@@ -72,8 +72,8 @@ public class XATransaction extends Transaction {
         case PREPARED_STATE:
             // 2 phase commit, work done.
             // We would record commit here.
-            setStateFinished();
             storeCommit(getTransactionId(), true, preCommitTask, postCommitTask);
+            setStateFinished();
             break;
         default:
             illegalStateTransition("commit");
@@ -88,9 +88,21 @@ public class XATransaction extends Transaction {
         } catch (XAException xae) {
             throw xae;
         } catch (Throwable t) {
-            LOG.warn("Store COMMIT FAILED: ", t);
-            rollback();
-            XAException xae = newXAException("STORE COMMIT FAILED: Transaction rolled back", XAException.XA_RBOTHER);
+            LOG.warn("Store COMMIT FAILED: " + txid, t);
+            XAException xae = null;
+            if (wasPrepared) {
+                // report and await outcome
+                xae = newXAException("STORE COMMIT FAILED: " + t.getMessage(), XAException.XA_RETRY);
+                // fire rollback syncs to revert
+                doPostRollback();
+            } else {
+                try {
+                    rollback();
+                    xae = newXAException("STORE COMMIT FAILED: Transaction rolled back", XAException.XA_RBCOMMFAIL);
+                } catch (Throwable e) {
+                    xae = newXAException("STORE COMMIT FAILED: " + t.getMessage() +". Rolled failed:"  + e.getMessage(), XAException.XA_RBINTEGRITY);
+                }
+            }
             xae.initCause(t);
             throw xae;
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ea70e827/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java b/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
index 022707d..7e6d089 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
@@ -120,7 +120,7 @@ import org.slf4j.LoggerFactory;
                                                 if (destination instanceof Queue) {
                                                     Queue queue = (Queue)destination;
                                                     if (queue.isResetNeeded()) {
-                                                        queue.clearPendingMessages();
+                                                        queue.clearPendingMessages(0);
                                                     }
                                                 }
                                             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ea70e827/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
index 20430b3..102dec5 100644
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Locale;
 import java.util.Set;
 import java.util.concurrent.ScheduledFuture;
@@ -84,7 +85,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
     private WireFormat wireFormat = new OpenWireFormat();
     private Statements statements;
     private JDBCAdapter adapter;
-    private MemoryTransactionStore transactionStore;
+    private final JdbcMemoryTransactionStore transactionStore = new JdbcMemoryTransactionStore(this);
     private ScheduledFuture<?> cleanupTicket;
     private int cleanupPeriod = 1000 * 60 * 5;
     private boolean useExternalMessageReferences;
@@ -102,6 +103,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
 
     protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
     protected int maxRows = DefaultJDBCAdapter.MAX_ROWS;
+    protected final HashMap<ActiveMQDestination, MessageStore> storeCache = new HashMap<>();
 
     {
         setLockKeepAlivePeriod(DEFAULT_LOCK_KEEP_ALIVE_PERIOD);
@@ -191,18 +193,26 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
 
     @Override
     public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
-        MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, audit);
-        if (transactionStore != null) {
-            rc = transactionStore.proxy(rc);
+        MessageStore rc = storeCache.get(destination);
+        if (rc == null) {
+            MessageStore store = transactionStore.proxy(new JDBCMessageStore(this, getAdapter(), wireFormat, destination, audit));
+            rc = storeCache.putIfAbsent(destination, store);
+            if (rc == null) {
+                rc = store;
+            }
         }
         return rc;
     }
 
     @Override
     public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
-        TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, audit);
-        if (transactionStore != null) {
-            rc = transactionStore.proxy(rc);
+        TopicMessageStore rc = (TopicMessageStore) storeCache.get(destination);
+        if (rc == null) {
+            TopicMessageStore store = transactionStore.proxy(new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, audit));
+            rc = (TopicMessageStore) storeCache.putIfAbsent(destination, store);
+            if (rc == null) {
+                rc = store;
+            }
         }
         return rc;
     }
@@ -220,6 +230,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
                 LOG.error("Failed to remove consumer destination: " + destination, ioe);
             }
         }
+        storeCache.remove(destination);
     }
 
     private void removeConsumerDestination(ActiveMQQueue destination) throws IOException {
@@ -243,13 +254,11 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
      */
     @Override
     public void removeTopicMessageStore(ActiveMQTopic destination) {
+        storeCache.remove(destination);
     }
 
     @Override
     public TransactionStore createTransactionStore() throws IOException {
-        if (transactionStore == null) {
-            transactionStore = new JdbcMemoryTransactionStore(this);
-        }
         return this.transactionStore;
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/ea70e827/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
index 3c8ba54..a857bcf 100644
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
@@ -152,7 +152,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
         }
 
         public LastRecoveredEntry defaultPriority() {
-            return perPriority[javax.jms.Message.DEFAULT_PRIORITY];
+            return perPriority[0];
         }
 
         @Override
@@ -321,7 +321,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
     public void pendingCompletion(String clientId, String subscriptionName, long sequenceId, byte priority) {
         final String key = getSubscriptionKey(clientId, subscriptionName);
         LastRecovered recovered = new LastRecovered();
-        recovered.perPriority[isPrioritizedMessages() ? priority : javax.jms.Message.DEFAULT_PRIORITY].recovered = sequenceId;
+        recovered.perPriority[priority].recovered = sequenceId;
         subscriberLastRecoveredMap.put(key, recovered);
         pendingCompletion.add(key);
         LOG.trace(this + ", pending completion: " + key + ", last: " + recovered);

http://git-wip-us.apache.org/repos/asf/activemq/blob/ea70e827/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
index 40bee61..6df0860 100644
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
@@ -18,16 +18,12 @@ package org.apache.activemq.store.jdbc;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 
-import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.TransactionBroker;
-import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.Queue;
-import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
@@ -36,12 +32,9 @@ import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.store.IndexListener;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.ProxyMessageStore;
-import org.apache.activemq.store.ProxyTopicMessageStore;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TransactionRecoveryListener;
 import org.apache.activemq.store.memory.MemoryTransactionStore;
-import org.apache.activemq.transaction.Synchronization;
-import org.apache.activemq.transaction.Transaction;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.DataByteArrayInputStream;
 
@@ -57,9 +50,6 @@ import org.apache.activemq.util.DataByteArrayInputStream;
 public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
 
 
-    private HashMap<ActiveMQDestination, MessageStore> topicStores = new HashMap<ActiveMQDestination, MessageStore>();
-    private HashMap<ActiveMQDestination, MessageStore> queueStores = new HashMap<ActiveMQDestination, MessageStore>();
-
     public JdbcMemoryTransactionStore(JDBCPersistenceAdapter jdbcPersistenceAdapter) {
         super(jdbcPersistenceAdapter);
     }
@@ -333,35 +323,34 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
     }
 
     @Override
-    protected void onProxyTopicStore(ProxyTopicMessageStore proxyTopicMessageStore) {
-        topicStores.put(proxyTopicMessageStore.getDestination(), proxyTopicMessageStore.getDelegate());
-    }
-
-    @Override
-    protected void onProxyQueueStore(ProxyMessageStore proxyQueueMessageStore) {
-        queueStores.put(proxyQueueMessageStore.getDestination(), proxyQueueMessageStore.getDelegate());
-    }
-
-    @Override
     protected void onRecovered(Tx tx) {
         for (RemoveMessageCommand removeMessageCommand: tx.acks) {
             if (removeMessageCommand instanceof LastAckCommand) {
                 LastAckCommand lastAckCommand = (LastAckCommand) removeMessageCommand;
-                JDBCTopicMessageStore jdbcTopicMessageStore = (JDBCTopicMessageStore) topicStores.get(lastAckCommand.getMessageAck().getDestination());
+                JDBCTopicMessageStore jdbcTopicMessageStore = (JDBCTopicMessageStore) findMessageStore(lastAckCommand.getMessageAck().getDestination());
                 jdbcTopicMessageStore.pendingCompletion(lastAckCommand.getClientId(), lastAckCommand.getSubName(), lastAckCommand.getSequence(), lastAckCommand.getPriority());
                 lastAckCommand.setMessageStore(jdbcTopicMessageStore);
             } else {
-                // when reading the store we ignore messages with non null XIDs but should include those with XIDS starting in - (pending acks in an xa transaction),
-                // but the sql is non portable to match BLOB with LIKE etc
-                // so we make up for it when we recover the ack
-                ((JDBCPersistenceAdapter)persistenceAdapter).getBrokerService().getRegionBroker().getDestinationMap().get(removeMessageCommand.getMessageAck().getDestination()).getDestinationStatistics().getMessages().increment();
-                ((RecoveredRemoveMessageCommand)removeMessageCommand).setMessageStore(queueStores.get(removeMessageCommand.getMessageAck().getDestination()));
+                ((RecoveredRemoveMessageCommand)removeMessageCommand).setMessageStore(findMessageStore(removeMessageCommand.getMessageAck().getDestination()));
             }
         }
         for (AddMessageCommand addMessageCommand : tx.messages) {
-            ActiveMQDestination destination = addMessageCommand.getMessage().getDestination();
-            addMessageCommand.setMessageStore(destination.isQueue() ? queueStores.get(destination) : topicStores.get(destination));
+            addMessageCommand.setMessageStore(findMessageStore(addMessageCommand.getMessage().getDestination()));
+        }
+    }
+
+    private MessageStore findMessageStore(ActiveMQDestination destination) {
+        ProxyMessageStore proxyMessageStore = null;
+        try {
+            if (destination.isQueue()) {
+                proxyMessageStore = (ProxyMessageStore) persistenceAdapter.createQueueMessageStore((ActiveMQQueue) destination);
+            } else {
+                proxyMessageStore = (ProxyMessageStore) persistenceAdapter.createTopicMessageStore((ActiveMQTopic) destination);
+            }
+        } catch (IOException error) {
+            throw new RuntimeException("Failed to find/create message store for destination: " + destination, error);
         }
+        return proxyMessageStore.getDelegate();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/ea70e827/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
index 7b0f61c..ab3bef8 100644
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
@@ -243,6 +243,7 @@ public class TransactionContext {
             updateLastAckStatement.close();
             updateLastAckStatement = null;
         }
+        completions.clear();
         connection.rollback();
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/ea70e827/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index b5c3abd..e719686 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -66,6 +66,7 @@ import org.apache.activemq.store.MessageStoreStatistics;
 import org.apache.activemq.store.MessageStoreSubscriptionStatistics;
 import org.apache.activemq.store.NoLocalSubscriptionAware;
 import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.ProxyMessageStore;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TransactionIdTransformer;
 import org.apache.activemq.store.TransactionStore;
@@ -363,11 +364,61 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
         this.forceRecoverIndex = forceRecoverIndex;
     }
 
+    public void forgetRecoveredAcks(ArrayList<MessageAck> preparedAcks, boolean isRollback) throws IOException {
+        if (preparedAcks != null) {
+            Map<ActiveMQDestination, KahaDBMessageStore> stores = new HashMap<>();
+            for (MessageAck ack : preparedAcks) {
+                stores.put(ack.getDestination(), findMatchingStore(ack.getDestination()));
+            }
+            ArrayList<MessageAck> perStoreAcks = new ArrayList<>();
+            for (Entry<ActiveMQDestination, KahaDBMessageStore> entry : stores.entrySet()) {
+                for (MessageAck ack : preparedAcks) {
+                    if (entry.getKey().equals(ack.getDestination())) {
+                        perStoreAcks.add(ack);
+                    }
+                }
+                entry.getValue().forgetRecoveredAcks(perStoreAcks, isRollback);
+                perStoreAcks.clear();
+            }
+        }
+    }
+
+    public void trackRecoveredAcks(ArrayList<MessageAck> preparedAcks) throws IOException {
+        Map<ActiveMQDestination, KahaDBMessageStore> stores = new HashMap<>();
+        for (MessageAck ack : preparedAcks) {
+            stores.put(ack.getDestination(), findMatchingStore(ack.getDestination()));
+        }
+        ArrayList<MessageAck> perStoreAcks = new ArrayList<>();
+        for (Entry<ActiveMQDestination, KahaDBMessageStore> entry : stores.entrySet()) {
+            for (MessageAck ack : preparedAcks) {
+                if (entry.getKey().equals(ack.getDestination())) {
+                    perStoreAcks.add(ack);
+                }
+            }
+            entry.getValue().trackRecoveredAcks(perStoreAcks);
+            perStoreAcks.clear();
+        }
+    }
+
+    private KahaDBMessageStore findMatchingStore(ActiveMQDestination activeMQDestination) throws IOException {
+        ProxyMessageStore store = (ProxyMessageStore) storeCache.get(convert(activeMQDestination));
+        if (store == null) {
+            if (activeMQDestination.isQueue()) {
+                store = (ProxyMessageStore) createQueueMessageStore((ActiveMQQueue) activeMQDestination);
+            } else {
+                store = (ProxyMessageStore) createTopicMessageStore((ActiveMQTopic) activeMQDestination);
+            }
+        }
+        return (KahaDBMessageStore) store.getDelegate();
+    }
+
     public class KahaDBMessageStore extends AbstractMessageStore {
         protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>();
         protected KahaDestination dest;
         private final int maxAsyncJobs;
         private final Semaphore localDestinationSemaphore;
+        protected final Set<String> ackedAndPrepared = new HashSet<>();
+        protected final Set<String> rolledBackAcks = new HashSet<>();
 
         double doneTasks, canceledTasks = 0;
 
@@ -383,6 +434,39 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
             return destination;
         }
 
+
+        // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback,
+        // till then they are skipped by the store.
+        // 'at most once' XA guarantee
+        public void trackRecoveredAcks(ArrayList<MessageAck> acks) {
+            indexLock.writeLock().lock();
+            try {
+                for (MessageAck ack : acks) {
+                    ackedAndPrepared.add(ack.getLastMessageId().toProducerKey());
+                }
+            } finally {
+                indexLock.writeLock().unlock();
+            }
+        }
+
+        public void forgetRecoveredAcks(ArrayList<MessageAck> acks, boolean rollback) throws IOException {
+            if (acks != null) {
+                indexLock.writeLock().lock();
+                try {
+                    for (MessageAck ack : acks) {
+                        final String id = ack.getLastMessageId().toProducerKey();
+                        ackedAndPrepared.remove(id);
+                        if (rollback) {
+                            rolledBackAcks.add(id);
+                            incrementAndAddSizeToStoreStat(dest, 0);
+                        }
+                    }
+                } finally {
+                    indexLock.writeLock().unlock();
+                }
+            }
+        }
+
         @Override
         public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
                 throws IOException {
@@ -739,6 +823,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
                            return statistics;
                         }
                     });
+                    recoveredStatistics.getMessageCount().subtract(ackedAndPrepared.size());
                     getMessageStoreStatistics().getMessageCount().setCount(recoveredStatistics.getMessageCount().getCount());
                     getMessageStoreStatistics().getMessageSize().setTotalSize(recoveredStatistics.getMessageSize().getTotalSize());
                 } finally {

http://git-wip-us.apache.org/repos/asf/activemq/blob/ea70e827/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 94de6ea..26e8cd0 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -3016,39 +3016,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
     private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<>();
     @SuppressWarnings("rawtypes")
     protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<>();
-    protected final Set<String> ackedAndPrepared = new HashSet<>();
-    protected final Set<String> rolledBackAcks = new HashSet<>();
-
-    // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback,
-    // till then they are skipped by the store.
-    // 'at most once' XA guarantee
-    public void trackRecoveredAcks(ArrayList<MessageAck> acks) {
-        this.indexLock.writeLock().lock();
-        try {
-            for (MessageAck ack : acks) {
-                ackedAndPrepared.add(ack.getLastMessageId().toProducerKey());
-            }
-        } finally {
-            this.indexLock.writeLock().unlock();
-        }
-    }
-
-    public void forgetRecoveredAcks(ArrayList<MessageAck> acks, boolean rollback) throws IOException {
-        if (acks != null) {
-            this.indexLock.writeLock().lock();
-            try {
-                for (MessageAck ack : acks) {
-                    final String id = ack.getLastMessageId().toProducerKey();
-                    ackedAndPrepared.remove(id);
-                    if (rollback) {
-                        rolledBackAcks.add(id);
-                    }
-                }
-            } finally {
-                this.indexLock.writeLock().unlock();
-            }
-        }
-    }
 
     @SuppressWarnings("rawtypes")
     private List<Operation> getInflightTx(KahaTransactionInfo info) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/ea70e827/activemq-unit-tests/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java
index 4b72538..d787e7a 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java
@@ -48,6 +48,7 @@ public class JdbcXARecoveryBrokerTest extends XARecoveryBrokerTest {
     @Override
     protected void configureBroker(BrokerService broker) throws Exception {
         super.configureBroker(broker);
+        broker.setAdvisorySupport(false);
 
         JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
         jdbc.setDataSource(dataSource);

http://git-wip-us.apache.org/repos/asf/activemq/blob/ea70e827/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
index f718803..387e77f 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
@@ -698,8 +698,8 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
 
         // validate destination depth via jmx
         DestinationViewMBean destinationView = getProxyToDestination(destinationList(destination)[0]);
-        assertEquals("enqueue count does not see prepared acks", 4, destinationView.getQueueSize());
-        assertEquals("enqueue count does not see prepared acks", 0, destinationView.getDequeueCount());
+        assertEquals("enqueue count does not see prepared acks", 0, destinationView.getQueueSize());
+        assertEquals("dequeue count does not see prepared acks", 0, destinationView.getDequeueCount());
 
         connection.request(createCommitTransaction2Phase(connectionInfo, txid));
 
@@ -708,7 +708,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
         assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length);
 
         assertEquals("enqueue count does not see commited acks", 0, destinationView.getQueueSize());
-        assertEquals("enqueue count does not see commited acks", 4, destinationView.getDequeueCount());
+        assertEquals("dequeue count does not see commited acks", 4, destinationView.getDequeueCount());
 
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/ea70e827/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java
index 05e7208..6972a14 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java
@@ -151,6 +151,7 @@ public class JDBCCommitExceptionTest extends TestCase {
     protected BrokerService createBroker() throws Exception {
 
         BrokerService broker = new BrokerService();
+        broker.setAdvisorySupport(false);
         jdbc = new BrokenPersistenceAdapter();
 
         jdbc.setUseLock(false);

http://git-wip-us.apache.org/repos/asf/activemq/blob/ea70e827/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java
index b8bb9ee..046ab81 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java
@@ -17,13 +17,10 @@
 
 package org.apache.activemq.store.jdbc;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
 import javax.jms.Destination;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
 import javax.jms.XAConnection;
 import javax.jms.XASession;
 import javax.transaction.xa.XAException;
@@ -31,6 +28,9 @@ import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 
 import org.apache.activemq.ActiveMQXAConnectionFactory;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.XATransactionId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,7 +47,7 @@ public class JDBCXACommitExceptionTest extends JDBCCommitExceptionTest {
     @Override
     public void setUp() throws Exception {
         super.setUp();
-
+        onePhase = true;
         factory = new ActiveMQXAConnectionFactory(
             connectionUri + "?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries="+messagesExpected);
     }
@@ -129,4 +129,41 @@ public class JDBCXACommitExceptionTest extends JDBCCommitExceptionTest {
     }
 
 
+    public void testCommitSendErrorRecovery() throws Exception {
+
+        XAConnection connection = factory.createXAConnection();
+        connection.start();
+        XASession session = connection.createXASession();
+
+        Destination destination = session.createQueue("TEST");
+        MessageProducer producer = session.createProducer(destination);
+
+        XAResource resource = session.getXAResource();
+
+        Xid tid = createXid();
+        resource.start(tid, XAResource.TMNOFLAGS);
+        ActiveMQMessage message = (ActiveMQMessage) session.createMessage();
+        message.setTransactionId(new XATransactionId(tid));
+        producer.send(message);
+
+        resource.end(tid, XAResource.TMSUCCESS);
+        resource.prepare(tid);
+
+        jdbc.setShouldBreak(true);
+        try {
+            resource.commit(tid, true);
+        } catch (Exception expected) {
+            expected.printStackTrace();
+        }
+
+        // recover
+        Xid[] recovered = resource.recover(XAResource.TMSTARTRSCAN);
+        resource.recover(XAResource.TMNOFLAGS);
+
+        jdbc.setShouldBreak(false);
+        resource.commit(recovered[0], false);
+
+        assertEquals("one enque", 1, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ea70e827/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
new file mode 100644
index 0000000..8da0ff6
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
@@ -0,0 +1,857 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.store.jdbc;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQXAConnection;
+import org.apache.activemq.ActiveMQXAConnectionFactory;
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.wireformat.WireFormat;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.XASession;
+import javax.management.ObjectName;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.Arrays;
+import java.util.Enumeration;
+
+import static org.apache.activemq.util.TestUtils.createXid;
+
+
+@RunWith(value = Parameterized.class)
+public class XACompletionTest extends TestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(XACompletionTest.class);
+
+    protected ActiveMQXAConnectionFactory factory;
+    protected static final int messagesExpected = 1;
+    protected BrokerService broker;
+    protected String connectionUri;
+
+    @Parameterized.Parameter
+    public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice;
+
+    @Parameterized.Parameters(name="store={0}")
+    public static Iterable<Object[]> getTestParameters() {
+        return Arrays.asList(new Object[][]{ {TestSupport.PersistenceAdapterChoice.KahaDB},{PersistenceAdapterChoice.JDBC} });
+    }
+
+        @Before
+    public void setUp() throws Exception {
+        broker = createBroker();
+    }
+
+    @After
+    public void stopAll() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+            broker = null;
+        }
+    }
+
+
+    @Test
+    public void testStatsAndRedispatchAfterAckPreparedClosed() throws Exception {
+
+        factory = new ActiveMQXAConnectionFactory(
+                connectionUri + "?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries=" + 0);
+
+        factory.setWatchTopicAdvisories(false);
+        sendMessages(1);
+
+        ActiveMQXAConnection activeMQXAConnection = (ActiveMQXAConnection) factory.createXAConnection();
+        activeMQXAConnection.start();
+        XASession xaSession = activeMQXAConnection.createXASession();
+
+        Destination destination = xaSession.createQueue("TEST");
+        MessageConsumer consumer = xaSession.createConsumer(destination);
+
+        XAResource resource = xaSession.getXAResource();
+        resource.recover(XAResource.TMSTARTRSCAN);
+        resource.recover(XAResource.TMNOFLAGS);
+
+        Xid tid = createXid();
+
+        resource.start(tid, XAResource.TMNOFLAGS);
+
+        Message message = consumer.receive(2000);
+        LOG.info("Received : " + message);
+
+        resource.end(tid, XAResource.TMSUCCESS);
+
+        activeMQXAConnection.close();
+
+        dumpMessages();
+
+        dumpMessages();
+
+        LOG.info("Try jmx browse... after commit");
+
+        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST");
+        QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
+                .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+
+        assertEquals("size", 1, proxy.getQueueSize());
+
+        LOG.info("Try receive... after rollback");
+        message = regularReceive("TEST");
+
+        assertNotNull("message gone", message);
+    }
+
+    @Test
+    public void testStatsAndBrowseAfterAckPreparedCommitted() throws Exception {
+
+        factory = new ActiveMQXAConnectionFactory(
+                connectionUri + "?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries=" + messagesExpected);
+
+        factory.setWatchTopicAdvisories(false);
+        sendMessages(messagesExpected);
+
+        ActiveMQXAConnection activeMQXAConnection = (ActiveMQXAConnection) factory.createXAConnection();
+        activeMQXAConnection.start();
+        XASession xaSession = activeMQXAConnection.createXASession();
+
+        Destination destination = xaSession.createQueue("TEST");
+        MessageConsumer consumer = xaSession.createConsumer(destination);
+
+        XAResource resource = xaSession.getXAResource();
+        resource.recover(XAResource.TMSTARTRSCAN);
+        resource.recover(XAResource.TMNOFLAGS);
+
+        Xid tid = createXid();
+
+        resource.start(tid, XAResource.TMNOFLAGS);
+
+        int messagesReceived = 0;
+
+        for (int i = 0; i < messagesExpected; i++) {
+
+            Message message = null;
+            try {
+                LOG.debug("Receiving message " + (messagesReceived + 1) + " of " + messagesExpected);
+                message = consumer.receive(2000);
+                LOG.info("Received : " + message);
+                messagesReceived++;
+            } catch (Exception e) {
+                LOG.debug("Caught exception:", e);
+            }
+        }
+
+        resource.end(tid, XAResource.TMSUCCESS);
+        resource.prepare(tid);
+
+        consumer.close();
+
+        dumpMessages();
+
+        resource.commit(tid, false);
+
+        dumpMessages();
+
+        LOG.info("Try jmx browse... after commit");
+
+        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST");
+        QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
+                .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+
+        assertTrue(proxy.browseMessages().isEmpty());
+        assertEquals("prefetch 0", 0, proxy.getInFlightCount());
+        assertEquals("size 0", 0, proxy.getQueueSize());
+
+        LOG.info("Try browse... after commit");
+        Message browsed = regularBrowseFirst();
+
+
+        assertNull("message gone", browsed);
+
+        LOG.info("Try receive... after commit");
+        Message message = regularReceive("TEST");
+
+        assertNull("message gone", message);
+
+    }
+
+
+    @Test
+    public void testStatsAndBrowseAfterAckPreparedRolledback() throws Exception {
+
+        factory = new ActiveMQXAConnectionFactory(
+                connectionUri + "?jms.prefetchPolicy.all=0");
+
+        factory.setWatchTopicAdvisories(false);
+        sendMessages(10);
+
+        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST");
+        QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
+                .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+
+        ActiveMQXAConnection activeMQXAConnection = (ActiveMQXAConnection) factory.createXAConnection();
+        activeMQXAConnection.start();
+        XASession xaSession = activeMQXAConnection.createXASession();
+
+        Destination destination = xaSession.createQueue("TEST");
+        MessageConsumer consumer = xaSession.createConsumer(destination);
+
+        XAResource resource = xaSession.getXAResource();
+        resource.recover(XAResource.TMSTARTRSCAN);
+        resource.recover(XAResource.TMNOFLAGS);
+
+        assertEquals("prefetch 0", 0, proxy.getInFlightCount());
+        assertEquals("size 0", 10, proxy.getQueueSize());
+        assertEquals("size 0", 0, proxy.cursorSize());
+
+        Xid tid = createXid();
+
+        resource.start(tid, XAResource.TMNOFLAGS);
+
+        for (int i = 0; i < 5; i++) {
+
+            Message message = null;
+            try {
+                message = consumer.receive(2000);
+                LOG.info("Received : " + message);
+            } catch (Exception e) {
+                LOG.debug("Caught exception:", e);
+            }
+        }
+
+        resource.end(tid, XAResource.TMSUCCESS);
+        resource.prepare(tid);
+
+        consumer.close();
+
+        dumpMessages();
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return proxy.getInFlightCount() == 0l;
+            }
+        });
+        assertEquals("prefetch", 0, proxy.getInFlightCount());
+        assertEquals("size", 10, proxy.getQueueSize());
+        assertEquals("cursor size", 0, proxy.cursorSize());
+
+        resource.rollback(tid);
+
+        dumpMessages();
+
+        LOG.info("Try jmx browse... after rollback");
+
+        assertEquals(10, proxy.browseMessages().size());
+
+        assertEquals("prefetch", 0, proxy.getInFlightCount());
+        assertEquals("size", 10, proxy.getQueueSize());
+        assertEquals("cursor size", 0, proxy.cursorSize());
+
+        LOG.info("Try browse... after");
+        Message browsed = regularBrowseFirst();
+        assertNotNull("message gone", browsed);
+
+        LOG.info("Try receive... after");
+        for (int i=0; i<10; i++) {
+            Message message = regularReceive("TEST");
+            assertNotNull("message gone", message);
+        }
+    }
+
+    @Test
+    public void testStatsAndConsumeAfterAckPreparedRolledback() throws Exception {
+
+        factory = new ActiveMQXAConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=0");
+
+        factory.setWatchTopicAdvisories(false);
+        sendMessages(10);
+
+        ActiveMQXAConnection activeMQXAConnection = (ActiveMQXAConnection) factory.createXAConnection();
+        activeMQXAConnection.start();
+        XASession xaSession = activeMQXAConnection.createXASession();
+
+        Destination destination = xaSession.createQueue("TEST");
+        MessageConsumer consumer = xaSession.createConsumer(destination);
+
+        XAResource resource = xaSession.getXAResource();
+        resource.recover(XAResource.TMSTARTRSCAN);
+        resource.recover(XAResource.TMNOFLAGS);
+
+        Xid tid = createXid();
+
+        resource.start(tid, XAResource.TMNOFLAGS);
+
+        int messagesReceived = 0;
+
+        for (int i = 0; i < 5; i++) {
+
+            Message message = null;
+            try {
+                LOG.debug("Receiving message " + (messagesReceived + 1) + " of " + messagesExpected);
+                message = consumer.receive(2000);
+                LOG.info("Received : " + message);
+                messagesReceived++;
+            } catch (Exception e) {
+                LOG.debug("Caught exception:", e);
+            }
+        }
+
+        resource.end(tid, XAResource.TMSUCCESS);
+        resource.prepare(tid);
+
+        consumer.close();
+
+        assertEquals("drain", 5, drainUnack(5, "TEST"));
+
+        dumpMessages();
+
+        broker = restartBroker();
+
+        assertEquals("redrain", 5, drainUnack(5, "TEST"));
+
+
+        LOG.info("Try consume... after restart");
+        dumpMessages();
+
+        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST");
+        QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
+                .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+
+        assertEquals("prefetch", 0, proxy.getInFlightCount());
+        assertEquals("size", 5, proxy.getQueueSize());
+        assertEquals("cursor size 0", 0, proxy.cursorSize());
+
+        factory = new ActiveMQXAConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=0");
+        factory.setWatchTopicAdvisories(false);
+
+        activeMQXAConnection = (ActiveMQXAConnection) factory.createXAConnection();
+        activeMQXAConnection.start();
+        xaSession = activeMQXAConnection.createXASession();
+
+        XAResource xaResource = xaSession.getXAResource();
+
+        Xid[] xids = xaResource.recover(XAResource.TMSTARTRSCAN);
+        xaResource.recover(XAResource.TMNOFLAGS);
+
+        LOG.info("Rollback outcome for ack");
+        xaResource.rollback(xids[0]);
+
+
+        LOG.info("Try receive... after rollback");
+        for (int i=0;i<10; i++) {
+            Message message = regularReceive("TEST");
+            assertNotNull("message gone: " + i, message);
+        }
+
+        dumpMessages();
+
+        assertNull("none left", regularReceive("TEST"));
+
+        assertEquals("prefetch", 0, proxy.getInFlightCount());
+        assertEquals("size", 0, proxy.getQueueSize());
+        assertEquals("cursor size", 0, proxy.cursorSize());
+        assertEquals("dq", 10, proxy.getDequeueCount());
+
+    }
+
+    @Test
+    public void testStatsAndConsumeAfterAckPreparedRolledbackOutOfOrderRecovery() throws Exception {
+
+        factory = new ActiveMQXAConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=0");
+        factory.setWatchTopicAdvisories(false);
+        sendMessages(20);
+
+
+        for (int i = 0; i < 10; i++) {
+
+            ActiveMQXAConnection activeMQXAConnection = (ActiveMQXAConnection) factory.createXAConnection();
+            activeMQXAConnection.start();
+            XASession xaSession = activeMQXAConnection.createXASession();
+
+            Destination destination = xaSession.createQueue("TEST");
+            MessageConsumer consumer = xaSession.createConsumer(destination);
+
+            XAResource resource = xaSession.getXAResource();
+            Xid tid = createXid();
+
+            resource.start(tid, XAResource.TMNOFLAGS);
+
+            Message message = null;
+            try {
+                message = consumer.receive(2000);
+                LOG.info("Received (" + i + ") : ," + message);
+            } catch (Exception e) {
+                LOG.debug("Caught exception:", e);
+            }
+
+            resource.end(tid, XAResource.TMSUCCESS);
+            resource.prepare(tid);
+
+            // no close - b/c messages end up in pagedInPendingDispatch!
+            // activeMQXAConnection.close();
+        }
+
+        ActiveMQXAConnection activeMQXAConnection = (ActiveMQXAConnection) factory.createXAConnection();
+        activeMQXAConnection.start();
+        XASession xaSession = activeMQXAConnection.createXASession();
+
+        XAResource xaResource = xaSession.getXAResource();
+
+        Xid[] xids = xaResource.recover(XAResource.TMSTARTRSCAN);
+        xaResource.recover(XAResource.TMNOFLAGS);
+
+
+        xaResource.rollback(xids[0]);
+        xaResource.rollback(xids[1]);
+
+        activeMQXAConnection.close();
+
+
+        LOG.info("RESTART");
+        broker = restartBroker();
+
+        dumpMessages();
+
+        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST");
+        QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
+                .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+
+
+        // set maxBatchSize=1
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=" + 1);
+        javax.jms.Connection connection = factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Destination destination = session.createQueue("TEST");
+        MessageConsumer consumer = session.createConsumer(destination);
+        consumer.close();
+
+        ActiveMQConnectionFactory receiveFactory = new ActiveMQConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=0");
+
+        // recover/rollback the second tx
+        ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new ActiveMQXAConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=0");
+        activeMQXAConnectionFactory.setWatchTopicAdvisories(false);
+        activeMQXAConnection = (ActiveMQXAConnection) activeMQXAConnectionFactory.createXAConnection();
+        activeMQXAConnection.start();
+        xaSession = activeMQXAConnection.createXASession();
+        xaResource = xaSession.getXAResource();
+        xids = xaResource.recover(XAResource.TMSTARTRSCAN);
+        xaResource.recover(XAResource.TMNOFLAGS);
+
+        for (int i=0; i< xids.length; i++) {
+            xaResource.rollback(xids[i]);
+        }
+
+        // another prefetch demand of 1
+        MessageConsumer consumer2 = session.createConsumer(new ActiveMQQueue("TEST?consumer.prefetchSize=2"));
+
+        LOG.info("Try receive... after rollback");
+        Message message = regularReceiveWith(receiveFactory, "TEST");
+        assertNotNull("message 1: ", message);
+        LOG.info("Received : " + message);
+
+        dumpMessages();
+
+        message = regularReceiveWith(receiveFactory, "TEST");
+        assertNotNull("last message", message);
+        LOG.info("Received : " + message);
+
+    }
+
+    @Test
+    public void testMoveInTwoBranches() throws Exception {
+
+        factory = new ActiveMQXAConnectionFactory(
+                connectionUri + "?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries=" + messagesExpected);
+
+        factory.setWatchTopicAdvisories(false);
+        sendMessages(messagesExpected);
+
+        ActiveMQXAConnection activeMQXAConnection = (ActiveMQXAConnection) factory.createXAConnection();
+        activeMQXAConnection.start();
+        XASession xaSession = activeMQXAConnection.createXASession();
+
+        Destination destination = xaSession.createQueue("TEST");
+        MessageConsumer consumer = xaSession.createConsumer(destination);
+
+        XAResource resource = xaSession.getXAResource();
+
+        final Xid tid = createXid();
+        byte[] branch = tid.getBranchQualifier();
+        final byte[] branch2  = Arrays.copyOf(branch, branch.length);
+        branch2[0] = '!';
+
+        Xid branchTid = new Xid() {
+            @Override
+            public int getFormatId() {
+                return tid.getFormatId();
+            }
+
+            @Override
+            public byte[] getGlobalTransactionId() {
+                return tid.getGlobalTransactionId();
+            }
+
+            @Override
+            public byte[] getBranchQualifier() {
+                return branch2;
+            }
+        };
+
+        resource.start(tid, XAResource.TMNOFLAGS);
+
+        int messagesReceived = 0;
+
+        Message message = null;
+
+        for (int i = 0; i < messagesExpected; i++) {
+
+            try {
+                LOG.debug("Receiving message " + (messagesReceived + 1) + " of " + messagesExpected);
+                message = consumer.receive(2000);
+                LOG.info("Received : " + message);
+                messagesReceived++;
+            } catch (Exception e) {
+                LOG.debug("Caught exception:", e);
+            }
+        }
+
+        resource.end(tid, XAResource.TMSUCCESS);
+
+        ActiveMQXAConnection activeMQXAConnectionSend = (ActiveMQXAConnection) factory.createXAConnection();
+        activeMQXAConnectionSend.start();
+        XASession xaSessionSend = activeMQXAConnection.createXASession();
+
+        Destination destinationSend = xaSessionSend.createQueue("TEST_MOVE");
+        MessageProducer producer = xaSessionSend.createProducer(destinationSend);
+
+        XAResource resourceSend = xaSessionSend.getXAResource();
+        resourceSend.start(branchTid, XAResource.TMNOFLAGS);
+
+        ActiveMQMessage toSend = (ActiveMQMessage) xaSessionSend.createTextMessage();
+        toSend.setTransactionId(new XATransactionId(branchTid));
+        producer.send(toSend);
+
+        resourceSend.end(branchTid, XAResource.TMSUCCESS);
+        resourceSend.prepare(branchTid);
+
+        resource.prepare(tid);
+
+        consumer.close();
+
+        LOG.info("Prepared");
+        dumpMessages();
+
+        LOG.info("Commit Ack");
+        resource.commit(tid, false);
+        dumpMessages();
+
+        LOG.info("Commit Send");
+        resourceSend.commit(branchTid, false);
+        dumpMessages();
+
+
+        LOG.info("Try jmx browse... after commit");
+
+        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST");
+        QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
+                .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+
+        assertTrue(proxy.browseMessages().isEmpty());
+        assertEquals("dq ", 1, proxy.getDequeueCount());
+        assertEquals("size 0", 0, proxy.getQueueSize());
+
+        ObjectName queueMoveViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST_MOVE");
+        QueueViewMBean moveProxy = (QueueViewMBean) broker.getManagementContext()
+                .newProxyInstance(queueMoveViewMBeanName, QueueViewMBean.class, true);
+
+        assertEquals("enq", 1, moveProxy.getEnqueueCount());
+        assertEquals("size 1", 1, moveProxy.getQueueSize());
+
+        assertNotNull(regularReceive("TEST_MOVE"));
+
+        assertEquals("size 0", 0, moveProxy.getQueueSize());
+
+    }
+
+
+
+    @Test
+    public void testMoveInTwoBranchesTwoBrokers() throws Exception {
+
+        factory = new ActiveMQXAConnectionFactory(
+                connectionUri + "?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries=" + messagesExpected);
+
+        factory.setWatchTopicAdvisories(false);
+        sendMessages(messagesExpected);
+
+        ActiveMQXAConnection activeMQXAConnection = (ActiveMQXAConnection) factory.createXAConnection();
+        activeMQXAConnection.start();
+        XASession xaSession = activeMQXAConnection.createXASession();
+
+        Destination destination = xaSession.createQueue("TEST");
+        MessageConsumer consumer = xaSession.createConsumer(destination);
+
+        XAResource resource = xaSession.getXAResource();
+
+        final Xid tid = createXid();
+        byte[] branch = tid.getBranchQualifier();
+        final byte[] branch2  = Arrays.copyOf(branch, branch.length);
+        branch2[0] = '!';
+
+        Xid branchTid = new Xid() {
+            @Override
+            public int getFormatId() {
+                return tid.getFormatId();
+            }
+
+            @Override
+            public byte[] getGlobalTransactionId() {
+                return tid.getGlobalTransactionId();
+            }
+
+            @Override
+            public byte[] getBranchQualifier() {
+                return branch2;
+            }
+        };
+
+        resource.start(tid, XAResource.TMNOFLAGS);
+
+        int messagesReceived = 0;
+
+        Message message = null;
+
+        for (int i = 0; i < messagesExpected; i++) {
+
+            try {
+                LOG.debug("Receiving message " + (messagesReceived + 1) + " of " + messagesExpected);
+                message = consumer.receive(2000);
+                LOG.info("Received : " + message);
+                messagesReceived++;
+            } catch (Exception e) {
+                LOG.debug("Caught exception:", e);
+            }
+        }
+
+        resource.end(tid, XAResource.TMSUCCESS);
+
+        ActiveMQXAConnection activeMQXAConnectionSend = (ActiveMQXAConnection) factory.createXAConnection();
+        activeMQXAConnectionSend.start();
+        XASession xaSessionSend = activeMQXAConnection.createXASession();
+
+        Destination destinationSend = xaSessionSend.createQueue("TEST_MOVE");
+        MessageProducer producer = xaSessionSend.createProducer(destinationSend);
+
+        XAResource resourceSend = xaSessionSend.getXAResource();
+        resourceSend.start(branchTid, XAResource.TMNOFLAGS);
+
+        ActiveMQMessage toSend = (ActiveMQMessage) xaSessionSend.createTextMessage();
+        toSend.setTransactionId(new XATransactionId(branchTid));
+        producer.send(toSend);
+
+        resourceSend.end(branchTid, XAResource.TMSUCCESS);
+        resourceSend.prepare(branchTid);
+
+        resource.prepare(tid);
+
+        consumer.close();
+
+        LOG.info("Prepared");
+        dumpMessages();
+
+        LOG.info("Commit Ack");
+        resource.commit(tid, false);
+        dumpMessages();
+
+        LOG.info("Commit Send");
+        resourceSend.commit(branchTid, false);
+        dumpMessages();
+
+
+        LOG.info("Try jmx browse... after commit");
+
+        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST");
+        QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
+                .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+
+        assertTrue(proxy.browseMessages().isEmpty());
+        assertEquals("dq ", 1, proxy.getDequeueCount());
+        assertEquals("size 0", 0, proxy.getQueueSize());
+
+        ObjectName queueMoveViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST_MOVE");
+        QueueViewMBean moveProxy = (QueueViewMBean) broker.getManagementContext()
+                .newProxyInstance(queueMoveViewMBeanName, QueueViewMBean.class, true);
+
+        assertEquals("enq", 1, moveProxy.getEnqueueCount());
+        assertEquals("size 1", 1, moveProxy.getQueueSize());
+
+        assertNotNull(regularReceive("TEST_MOVE"));
+
+        assertEquals("size 0", 0, moveProxy.getQueueSize());
+
+    }
+
+
+    private Message regularReceive(String qName) throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
+        return regularReceiveWith(factory, qName);
+    }
+
+    private Message regularReceiveWith(ActiveMQConnectionFactory factory, String qName) throws Exception {
+        javax.jms.Connection connection = factory.createConnection();
+        try {
+            connection.start();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Destination destination = session.createQueue(qName);
+            MessageConsumer consumer = session.createConsumer(destination);
+            return consumer.receive(2000);
+        } finally {
+            connection.close();
+        }
+    }
+
+    private int drainUnack(int limit, String qName) throws Exception {
+        int drained = 0;
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=" + limit);
+        javax.jms.Connection connection = factory.createConnection();
+        try {
+            connection.start();
+            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+            Destination destination = session.createQueue(qName);
+            MessageConsumer consumer = session.createConsumer(destination);
+            while (drained < limit && consumer.receive(2000) != null) {
+                drained++;
+            };
+            consumer.close();
+        } finally {
+            connection.close();
+        }
+        return drained;
+    }
+
+    private Message regularBrowseFirst() throws Exception {
+        javax.jms.Connection connection = factory.createConnection();
+        try {
+            connection.start();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue destination = session.createQueue("TEST");
+            QueueBrowser browser = session.createBrowser(destination);
+            Enumeration e = browser.getEnumeration();
+            if (e.hasMoreElements()) {
+                return (Message) e.nextElement();
+            }
+            return null;
+        } finally {
+            connection.close();
+        }
+    }
+    protected void sendMessages(int messagesExpected) throws Exception {
+        sendMessagesWith(factory, messagesExpected);
+    }
+
+    protected void sendMessagesWith(ConnectionFactory factory, int messagesExpected) throws Exception {
+        javax.jms.Connection connection = factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue("TEST");
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+        for (int i=0; i<messagesExpected; i++) {
+            LOG.debug("Sending message " + (i+1) + " of " + messagesExpected);
+            producer.send(session.createTextMessage("test message " + (i+1)));
+        }
+        connection.close();
+    }
+
+    protected void dumpMessages() throws Exception {
+
+        if (persistenceAdapterChoice.compareTo(PersistenceAdapterChoice.JDBC) != 0) {
+            return;
+        }
+        WireFormat wireFormat = new OpenWireFormat();
+        java.sql.Connection conn = ((JDBCPersistenceAdapter) broker.getPersistenceAdapter()).getDataSource().getConnection();
+        PreparedStatement statement = conn.prepareStatement("SELECT ID, MSG, XID FROM ACTIVEMQ_MSGS");
+        ResultSet result = statement.executeQuery();
+        LOG.info("Messages in broker db...");
+        while(result.next()) {
+            long id = result.getLong(1);
+            org.apache.activemq.command.Message message = (org.apache.activemq.command.Message)wireFormat.unmarshal(new ByteSequence(result.getBytes(2)));
+            String xid = result.getString(3);
+            LOG.info("id: " + id + ", message SeqId: " + message.getMessageId().getBrokerSequenceId() + ", XID:" + xid + ", MSG: " + message);
+        }
+        statement.close();
+        conn.close();
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        return createBroker(true);
+    }
+
+    protected BrokerService restartBroker() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+        return createBroker(false);
+    }
+
+    protected BrokerService createBroker(boolean del) throws Exception {
+
+        BrokerService broker = new BrokerService();
+        broker.setAdvisorySupport(false);
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry policyEntry = new PolicyEntry();
+        policyEntry.setExpireMessagesPeriod(0);
+        policyMap.setDefaultEntry(policyEntry);
+        broker.setDestinationPolicy(policyMap);
+
+        broker.setDeleteAllMessagesOnStartup(del);
+
+        setPersistenceAdapter(broker, persistenceAdapterChoice);
+        broker.setPersistent(true);
+        connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
+        broker.start();
+        return broker;
+    }
+}