You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2015/10/17 00:05:56 UTC

[2/2] qpid-jms git commit: QPIDJMS-126 Adds support for dealing with the coordinator link being closed either during tx operations or on commit / rollback. Handling of message acknowledge dropping during link downtime is not implemented yet.

QPIDJMS-126  Adds support for dealing with the coordinator link being
closed either during tx operations or on commit / rollback.  Handling of
message acknowledge dropping during link downtime is not implemented
yet.  

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/974a8510
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/974a8510
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/974a8510

Branch: refs/heads/master
Commit: 974a8510bf95d48fe9174c25ad178198145771e1
Parents: 69d2273
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Oct 16 18:05:45 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Oct 16 18:05:45 2015 -0400

----------------------------------------------------------------------
 .../qpid/jms/JmsLocalTransactionContext.java    |   3 -
 .../qpid/jms/meta/JmsAbstractResourceId.java    |  11 +
 .../org/apache/qpid/jms/meta/JmsResourceId.java |  16 ++
 .../jms/provider/amqp/AmqpAbstractResource.java |  16 +-
 .../qpid/jms/provider/amqp/AmqpConsumer.java    |  10 +-
 .../jms/provider/amqp/AmqpFixedProducer.java    |   7 +
 .../qpid/jms/provider/amqp/AmqpSession.java     |  14 +-
 .../qpid/jms/provider/amqp/AmqpSupport.java     |   4 +
 .../provider/amqp/AmqpTransactionContext.java   | 248 ++++++++--------
 .../amqp/AmqpTransactionCoordinator.java        | 209 ++++++++++++++
 .../amqp/builders/AmqpConnectionBuilder.java    |   2 +-
 .../amqp/builders/AmqpSessionBuilder.java       |  30 --
 .../builders/AmqpTransactionContextBuilder.java |  66 -----
 .../AmqpTransactionCoordinatorBuilder.java      |  66 +++++
 .../integration/ConnectionIntegrationTest.java  |   8 +-
 .../QueueBrowserIntegrationTest.java            |   4 +-
 .../jms/integration/SessionIntegrationTest.java |   4 +-
 .../TransactionsIntegrationTest.java            | 280 ++++++++++---------
 .../failover/FailoverIntegrationTest.java       |  23 +-
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    | 131 +++++++++
 20 files changed, 745 insertions(+), 407 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
index cf5043e..40a4ad1 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
@@ -284,7 +284,6 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
         // work from getting queued on the provider needlessly.
         lock.writeLock().tryLock();
         try {
-            LOG.info("onConnectionRecovery starting new transaction to replace old one.");
 
             // On connection recover we open a new TX to replace the existing one.
             transactionId = connection.getNextTransactionId();
@@ -293,8 +292,6 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
             provider.create(transaction, request);
             request.sync();
 
-            LOG.info("onConnectionRecovery started new transaction to replace old one.");
-
             // It is ok to use the newly created TX from here if the TX never had any
             // work done within it otherwise we want the next commit to fail.
             failed = hasWork;

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsAbstractResourceId.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsAbstractResourceId.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsAbstractResourceId.java
index 549e4c9..3f891e2 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsAbstractResourceId.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsAbstractResourceId.java
@@ -22,6 +22,7 @@ package org.apache.qpid.jms.meta;
 public abstract class JmsAbstractResourceId implements JmsResourceId {
 
     protected transient Object providerHint;
+    protected transient Object providerContext;
     protected transient int hashCode;
 
     @Override
@@ -33,4 +34,14 @@ public abstract class JmsAbstractResourceId implements JmsResourceId {
     public Object getProviderHint() {
         return providerHint;
     }
+
+    @Override
+    public void setProviderContext(Object context) {
+        this.providerContext = context;
+    }
+
+    @Override
+    public Object getProviderContext() {
+        return providerContext;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResourceId.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResourceId.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResourceId.java
index 5dd4361..d6de5fc 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResourceId.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResourceId.java
@@ -37,4 +37,20 @@ public interface JmsResourceId {
      */
     Object getProviderHint();
 
+    /**
+     * Allows a Provider to embed a bit of Context for later use.  The context
+     * can be some state data needed between asynchronous requests etc.
+     *
+     * @param value
+     *      The value to add as context for this Id.
+     */
+    void setProviderContext(Object value);
+
+    /**
+     * Return the previously stored Provider context object.
+     *
+     * @return the previously stored Provider context object.
+     */
+    Object getProviderContext();
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
index 5474d75..fe17a63 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
@@ -105,9 +105,9 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
         getEndpoint().free();
         getEndpoint().setContext(null);
 
-        if (this.closeRequest != null) {
-            this.closeRequest.onSuccess();
-            this.closeRequest = null;
+        if (closeRequest != null) {
+            closeRequest.onSuccess();
+            closeRequest = null;
         }
     }
 
@@ -145,11 +145,15 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
     //----- Access methods ---------------------------------------------------//
 
     public E getEndpoint() {
-        return this.endpoint;
+        return endpoint;
     }
 
     public R getResourceInfo() {
-        return this.resourceInfo;
+        return resourceInfo;
+    }
+
+    public AmqpResourceParent getParent() {
+        return parent;
     }
 
     //----- Endpoint state access methods ------------------------------------//
@@ -163,7 +167,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
     }
 
     public boolean isAwaitingClose() {
-        return this.closeRequest != null;
+        return closeRequest != null;
     }
 
     public EndpointState getLocalState() {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index d2df866..9d95fa4 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -523,16 +523,10 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
     public void preRollback() {
     }
 
-    /**
-     * @throws Exception if an error occurs while performing this action.
-     */
-    public void postCommit() throws Exception {
+    public void postCommit() {
     }
 
-    /**
-     * @throws Exception if an error occurs while performing this action.
-     */
-    public void postRollback() throws Exception {
+    public void postRollback() {
     }
 
     //----- Inner classes used in message pull operations --------------------//

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index 47cee33..e2dbc39 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -100,6 +100,13 @@ public class AmqpFixedProducer extends AmqpProducer {
     }
 
     private void doSend(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException {
+        // If the transaction has failed due to remote termination etc then we just indicate
+        // the send has succeeded until the a new transaction is started.
+        if (session.isTransacted() && session.isTransactionFailed()) {
+            request.onSuccess();
+            return;
+        }
+
         JmsMessageFacade facade = envelope.getMessage().getFacade();
 
         LOG.trace("Producer sending message: {}", envelope);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
index 28fe3ea..8961258 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
@@ -42,7 +42,7 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> i
     private static final Logger LOG = LoggerFactory.getLogger(AmqpSession.class);
 
     private final AmqpConnection connection;
-    private AmqpTransactionContext txContext;
+    private final AmqpTransactionContext txContext;
 
     private final Map<JmsConsumerId, AmqpConsumer> consumers = new HashMap<JmsConsumerId, AmqpConsumer>();
 
@@ -50,6 +50,12 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> i
         super(info, session, connection);
 
         this.connection = connection;
+
+        if (info.isTransacted()) {
+            txContext = new AmqpTransactionContext(this, info);
+        } else {
+            txContext = null;
+        }
     }
 
     /**
@@ -198,8 +204,6 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> i
         if (resource instanceof AmqpConsumer) {
             AmqpConsumer consumer = (AmqpConsumer) resource;
             consumers.put(consumer.getConsumerId(), consumer);
-        } else if (resource instanceof AmqpTransactionContext) {
-            txContext = (AmqpTransactionContext) resource;
         } else {
             connection.addChildResource(resource);
         }
@@ -262,6 +266,10 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> i
         return getResourceInfo().isTransacted();
     }
 
+    public boolean isTransactionFailed() {
+        return txContext == null ? false : txContext.isTransactionFailed();
+    }
+
     boolean isAsyncAck() {
         return getResourceInfo().isSendAcksAsync() || isTransacted();
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
index 624a26b..1ec2bfa 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
@@ -23,10 +23,12 @@ import javax.jms.InvalidClientIDException;
 import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
 import javax.jms.JMSSecurityException;
+import javax.jms.TransactionRolledBackException;
 
 import org.apache.qpid.jms.provider.ProviderRedirectedException;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Modified;
+import org.apache.qpid.proton.amqp.transaction.TransactionErrors;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
 import org.apache.qpid.proton.amqp.transport.ConnectionError;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
@@ -98,6 +100,8 @@ public class AmqpSupport {
                 remoteError = new JMSSecurityException(message);
             } else if (error.equals(AmqpError.NOT_FOUND)) {
                 remoteError = new InvalidDestinationException(message);
+            } else if (error.equals(TransactionErrors.TRANSACTION_ROLLBACK)) {
+                remoteError = new TransactionRolledBackException(message);
             } else if (error.equals(ConnectionError.REDIRECT)) {
                 remoteError = createRedirectException(error, message, errorCondition);
             } else if (error.equals(AmqpError.INVALID_FIELD)) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
index 648b896..eb3447c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
@@ -17,27 +17,16 @@
 package org.apache.qpid.jms.provider.amqp;
 
 import java.io.IOException;
-import java.nio.BufferOverflowException;
 import java.util.LinkedHashSet;
 import java.util.Set;
 
 import javax.jms.IllegalStateException;
-import javax.jms.TransactionRolledBackException;
 
 import org.apache.qpid.jms.meta.JmsSessionInfo;
 import org.apache.qpid.jms.meta.JmsTransactionId;
 import org.apache.qpid.jms.provider.AsyncResult;
-import org.apache.qpid.jms.util.IOExceptionSupport;
+import org.apache.qpid.jms.provider.amqp.builders.AmqpTransactionCoordinatorBuilder;
 import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.amqp.messaging.AmqpValue;
-import org.apache.qpid.proton.amqp.messaging.Rejected;
-import org.apache.qpid.proton.amqp.transaction.Declare;
-import org.apache.qpid.proton.amqp.transaction.Declared;
-import org.apache.qpid.proton.amqp.transaction.Discharge;
-import org.apache.qpid.proton.amqp.transport.DeliveryState;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.Sender;
-import org.apache.qpid.proton.message.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,22 +36,15 @@ import org.slf4j.LoggerFactory;
  * The Transaction will carry a JmsTransactionId while the Transaction is open, once a
  * transaction has been committed or rolled back the Transaction Id is cleared.
  */
-public class AmqpTransactionContext extends AmqpAbstractResource<JmsSessionInfo, Sender> {
+public class AmqpTransactionContext implements AmqpResourceParent {
 
     private static final Logger LOG = LoggerFactory.getLogger(AmqpTransactionContext.class);
 
-    private static final Boolean ROLLBACK_MARKER = Boolean.FALSE;
-    private static final Boolean COMMIT_MARKER = Boolean.TRUE;
-
-    private final byte[] OUTBOUND_BUFFER = new byte[64];
-
     private final AmqpSession session;
-    private JmsTransactionId current;
-    private final AmqpTransferTagGenerator tagGenerator = new AmqpTransferTagGenerator();
     private final Set<AmqpConsumer> txConsumers = new LinkedHashSet<AmqpConsumer>();
 
-    private Delivery pendingDelivery;
-    private AsyncResult pendingRequest;
+    private JmsTransactionId current;
+    private AmqpTransactionCoordinator coordinator;
 
     /**
      * Creates a new AmqpTransactionContext instance.
@@ -71,131 +53,146 @@ public class AmqpTransactionContext extends AmqpAbstractResource<JmsSessionInfo,
      *        The session that owns this transaction context.
      * @param resourceInfo
      *        The resourceInfo that defines this transaction context.
-     * @param sender
-     *        The local sender endpoint for this transaction context.
      */
-    public AmqpTransactionContext(AmqpSession session, JmsSessionInfo resourceInfo, Sender sender) {
-        super(resourceInfo, sender, session);
-
+    public AmqpTransactionContext(AmqpSession session, JmsSessionInfo resourceInfo) {
         this.session = session;
     }
 
-    @Override
-    public void processDeliveryUpdates(AmqpProvider provider) throws IOException {
-        try {
-            if (pendingDelivery != null && pendingDelivery.remotelySettled()) {
-                DeliveryState state = pendingDelivery.getRemoteState();
-                if (state instanceof Declared) {
-                    Declared declared = (Declared) state;
-                    current.setProviderHint(declared.getTxnId());
-                    pendingDelivery.settle();
-                    LOG.debug("New TX started: {}", current.getProviderHint());
-                    AsyncResult request = this.pendingRequest;
-                    this.pendingRequest = null;
-                    this.pendingDelivery = null;
-                    request.onSuccess();
-                } else if (state instanceof Rejected) {
-                    LOG.debug("Last TX request failed: {}", current.getProviderHint());
-                    pendingDelivery.settle();
-                    Rejected rejected = (Rejected) state;
-                    Exception cause = AmqpSupport.convertToException(rejected.getError());
-                    TransactionRolledBackException ex = new TransactionRolledBackException(cause.getMessage());
-                    AsyncResult request = this.pendingRequest;
-                    this.current = null;
-                    this.pendingRequest = null;
-                    this.pendingDelivery = null;
-                    postRollback();
-                    request.onFailure(ex);
-                } else {
-                    LOG.debug("Last TX request succeeded: {}", current.getProviderHint());
-                    pendingDelivery.settle();
-                    AsyncResult request = this.pendingRequest;
-                    if (pendingDelivery.getContext() != null) {
-                        if (pendingDelivery.getContext().equals(COMMIT_MARKER)) {
-                            postCommit();
-                        } else {
-                            postRollback();
-                        }
-                    }
-                    this.current = null;
-                    this.pendingRequest = null;
-                    this.pendingDelivery = null;
-                    request.onSuccess();
-                }
-            }
-
-            super.processDeliveryUpdates(provider);
-        } catch (Exception e) {
-            throw IOExceptionSupport.create(e);
-        }
-    }
-
-    public void begin(JmsTransactionId txId, AsyncResult request) throws Exception {
+    public void begin(final JmsTransactionId txId, final AsyncResult request) throws Exception {
         if (current != null) {
             throw new IOException("Begin called while a TX is still Active.");
         }
 
-        Message message = Message.Factory.create();
-        Declare declare = new Declare();
-        message.setBody(new AmqpValue(declare));
+        final AsyncResult declareCompletion = new AsyncResult() {
+
+            @Override
+            public void onSuccess() {
+                current = txId;
+                request.onSuccess();
+            }
+
+            @Override
+            public void onFailure(Throwable result) {
+                current = null;
+                request.onFailure(result);
+            }
+
+            @Override
+            public boolean isComplete() {
+                return current != null;
+            }
+        };
+
+
+        if (coordinator == null || coordinator.isClosed()) {
+            AmqpTransactionCoordinatorBuilder builder =
+                new AmqpTransactionCoordinatorBuilder(this, session.getResourceInfo());
+            builder.buildResource(new AsyncResult() {
+
+                @Override
+                public void onSuccess() {
+                    try {
+                        coordinator.declare(txId, declareCompletion);
+                    } catch (Exception e) {
+                        request.onFailure(e);
+                    }
+                }
 
-        pendingDelivery = getEndpoint().delivery(tagGenerator.getNextTag());
-        pendingRequest = request;
-        current = txId;
+                @Override
+                public void onFailure(Throwable result) {
+                    request.onFailure(result);
+                }
 
-        sendTxCommand(message);
+                @Override
+                public boolean isComplete() {
+                    return request.isComplete();
+                }
+            });
+        } else {
+            coordinator.declare(txId, declareCompletion);
+        }
     }
 
-    public void commit(AsyncResult request) throws Exception {
+    public void commit(final AsyncResult request) throws Exception {
         if (current == null) {
             throw new IllegalStateException("Commit called with no active Transaction.");
         }
 
         preCommit();
 
-        Message message = Message.Factory.create();
-        Discharge discharge = new Discharge();
-        discharge.setFail(false);
-        discharge.setTxnId((Binary) current.getProviderHint());
-        message.setBody(new AmqpValue(discharge));
+        LOG.trace("TX Context[{}] committing current TX[[]]", this, current);
+        coordinator.discharge(current, new AsyncResult() {
+
+            @Override
+            public void onSuccess() {
+                current = null;
+                postCommit();
+                request.onSuccess();
+            }
+
+            @Override
+            public void onFailure(Throwable result) {
+                current = null;
+                postCommit();
+                request.onFailure(result);
+            }
 
-        pendingDelivery = getEndpoint().delivery(tagGenerator.getNextTag());
-        pendingDelivery.setContext(COMMIT_MARKER);
-        pendingRequest = request;
+            @Override
+            public boolean isComplete() {
+                return current == null;
+            }
 
-        sendTxCommand(message);
+        }, true);
     }
 
-    public void rollback(AsyncResult request) throws Exception {
+    public void rollback(final AsyncResult request) throws Exception {
         if (current == null) {
             throw new IllegalStateException("Rollback called with no active Transaction.");
         }
 
         preRollback();
 
-        Message message = Message.Factory.create();
-        Discharge discharge = new Discharge();
-        discharge.setFail(true);
-        discharge.setTxnId((Binary) current.getProviderHint());
-        message.setBody(new AmqpValue(discharge));
+        LOG.trace("TX Context[{}] rolling back current TX[[]]", this, current);
+        coordinator.discharge(current, new AsyncResult() {
 
-        pendingDelivery = getEndpoint().delivery(tagGenerator.getNextTag());
-        pendingDelivery.setContext(ROLLBACK_MARKER);
-        pendingRequest = request;
+            @Override
+            public void onSuccess() {
+                current = null;
+                postRollback();
+                request.onSuccess();
+            }
+
+            @Override
+            public void onFailure(Throwable result) {
+                current = null;
+                postRollback();
+                request.onFailure(result);
+            }
 
-        sendTxCommand(message);
+            @Override
+            public boolean isComplete() {
+                return current == null;
+            }
+
+        }, false);
     }
 
+    //----- Context utility methods ------------------------------------------//
+
     public void registerTxConsumer(AmqpConsumer consumer) {
-        this.txConsumers.add(consumer);
+        txConsumers.add(consumer);
     }
 
     public AmqpSession getSession() {
-        return this.session;
+        return session;
     }
 
     public JmsTransactionId getTransactionId() {
-        return this.current;
+        return current;
+    }
+
+    public boolean isTransactionFailed() {
+        return coordinator == null ? false : coordinator.isClosed();
     }
 
     public Binary getAmqpTransactionId() {
@@ -211,19 +208,21 @@ public class AmqpTransactionContext extends AmqpAbstractResource<JmsSessionInfo,
         return this.session.getSessionId() + ": txContext";
     }
 
-    private void preCommit() throws Exception {
+    //----- Transaction pre / post completion --------------------------------//
+
+    private void preCommit() {
         for (AmqpConsumer consumer : txConsumers) {
             consumer.preCommit();
         }
     }
 
-    private void preRollback() throws Exception {
+    private void preRollback() {
         for (AmqpConsumer consumer : txConsumers) {
             consumer.preRollback();
         }
     }
 
-    private void postCommit() throws Exception {
+    private void postCommit() {
         for (AmqpConsumer consumer : txConsumers) {
             consumer.postCommit();
         }
@@ -231,7 +230,7 @@ public class AmqpTransactionContext extends AmqpAbstractResource<JmsSessionInfo,
         txConsumers.clear();
     }
 
-    private void postRollback() throws Exception {
+    private void postRollback() {
         for (AmqpConsumer consumer : txConsumers) {
             consumer.postRollback();
         }
@@ -239,20 +238,19 @@ public class AmqpTransactionContext extends AmqpAbstractResource<JmsSessionInfo,
         txConsumers.clear();
     }
 
-    private void sendTxCommand(Message message) throws IOException {
-        int encodedSize = 0;
-        byte[] buffer = OUTBOUND_BUFFER;
-        while (true) {
-            try {
-                encodedSize = message.encode(buffer, 0, buffer.length);
-                break;
-            } catch (BufferOverflowException e) {
-                buffer = new byte[buffer.length * 2];
-            }
+    //----- Resource Parent event handlers -----------------------------------//
+
+    @Override
+    public void addChildResource(AmqpResource resource) {
+        if (resource instanceof AmqpTransactionCoordinator) {
+            coordinator = (AmqpTransactionCoordinator) resource;
         }
+    }
 
-        Sender sender = getEndpoint();
-        sender.send(buffer, 0, encodedSize);
-        sender.advance();
+    @Override
+    public void removeChildResource(AmqpResource resource) {
+        // We don't clear the coordinator link so that we can refer to it
+        // to check if the current TX has failed due to link closed during
+        // normal operations.
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
new file mode 100644
index 0000000..12e824a
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.TransactionRolledBackException;
+
+import org.apache.qpid.jms.meta.JmsSessionInfo;
+import org.apache.qpid.jms.meta.JmsTransactionId;
+import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.jms.util.IOExceptionSupport;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.transaction.Declare;
+import org.apache.qpid.proton.amqp.transaction.Declared;
+import org.apache.qpid.proton.amqp.transaction.Discharge;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.message.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents the AMQP Transaction coordinator link used by the transaction context
+ * of a session to control the lifetime of a given transaction.
+ */
+public class AmqpTransactionCoordinator extends AmqpAbstractResource<JmsSessionInfo, Sender> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpTransactionCoordinator.class);
+
+    private static final Boolean ROLLBACK_MARKER = Boolean.FALSE;
+    private static final Boolean COMMIT_MARKER = Boolean.TRUE;
+
+    private final byte[] OUTBOUND_BUFFER = new byte[64];
+
+    private final AmqpTransferTagGenerator tagGenerator = new AmqpTransferTagGenerator();
+
+    private Delivery pendingDelivery;
+    private AsyncResult pendingRequest;
+
+    public AmqpTransactionCoordinator(JmsSessionInfo resourceInfo, Sender endpoint, AmqpResourceParent parent) {
+        super(resourceInfo, endpoint, parent);
+    }
+
+    @Override
+    public void processDeliveryUpdates(AmqpProvider provider) throws IOException {
+        try {
+            if (pendingDelivery != null && pendingDelivery.remotelySettled()) {
+                DeliveryState state = pendingDelivery.getRemoteState();
+                JmsTransactionId txId = (JmsTransactionId) pendingDelivery.getContext();
+                if (state instanceof Declared) {
+                    LOG.debug("New TX started: {}", txId);
+                    Declared declared = (Declared) state;
+                    txId.setProviderHint(declared.getTxnId());
+                    pendingRequest.onSuccess();
+                } else if (state instanceof Rejected) {
+                    LOG.debug("Last TX request failed: {}", txId);
+                    Rejected rejected = (Rejected) state;
+                    Exception cause = AmqpSupport.convertToException(rejected.getError());
+                    JMSException failureCause = null;
+                    if (txId.getProviderContext() == COMMIT_MARKER) {
+                        failureCause = new TransactionRolledBackException(cause.getMessage());
+                    } else {
+                        failureCause = new JMSException(cause.getMessage());
+                    }
+
+                    pendingRequest.onFailure(failureCause);
+                } else {
+                    LOG.debug("Last TX request succeeded: {}", txId);
+                    pendingRequest.onSuccess();
+                }
+
+                // Reset state for next TX action.
+                pendingDelivery.settle();
+                pendingRequest = null;
+                pendingDelivery = null;
+            }
+
+            super.processDeliveryUpdates(provider);
+        } catch (Exception e) {
+            throw IOExceptionSupport.create(e);
+        }
+    }
+
+    public void declare(JmsTransactionId txId, AsyncResult request) throws Exception {
+        if (txId.getProviderHint() != null) {
+            throw new IllegalStateException("Declar called while a TX is still Active.");
+        }
+
+        if (isClosed()) {
+            request.onFailure(new JMSException("Cannot start new transaction: Coordinator remotely closed"));
+            return;
+        }
+
+        Message message = Message.Factory.create();
+        Declare declare = new Declare();
+        message.setBody(new AmqpValue(declare));
+
+        pendingDelivery = getEndpoint().delivery(tagGenerator.getNextTag());
+        pendingDelivery.setContext(txId);
+        pendingRequest = request;
+
+        sendTxCommand(message);
+    }
+
+    public void discharge(JmsTransactionId txId, AsyncResult request, boolean commit) throws Exception {
+        if (txId.getProviderHint() == null) {
+            throw new IllegalStateException("Discharge called with no active Transaction.");
+        }
+
+        if (isClosed()) {
+            Exception failureCause = null;
+
+            if (commit) {
+                failureCause = new TransactionRolledBackException("Transaction inbout: Coordinator remotely closed");
+            } else {
+                failureCause = new JMSException("Rollback cannot complete: Coordinator remotely closed");
+            }
+
+            request.onFailure(failureCause);
+            return;
+        }
+
+        // Store the context of this action in the transaction ID for later completion.
+        txId.setProviderContext(commit ? COMMIT_MARKER : ROLLBACK_MARKER);
+
+        Message message = Message.Factory.create();
+        Discharge discharge = new Discharge();
+        discharge.setFail(!commit);
+        discharge.setTxnId((Binary) txId.getProviderHint());
+        message.setBody(new AmqpValue(discharge));
+
+        pendingDelivery = getEndpoint().delivery(tagGenerator.getNextTag());
+        pendingDelivery.setContext(txId);
+        pendingRequest = request;
+
+        sendTxCommand(message);
+    }
+
+    //----- Base class overrides ---------------------------------------------//
+
+    @Override
+    public void remotelyClosed(AmqpProvider provider) {
+
+        Exception txnError = AmqpSupport.convertToException(getEndpoint().getRemoteCondition());
+
+        // Alert any pending operation that the link failed to complete the pending
+        // begin / commit / rollback operation.
+        if (pendingRequest != null) {
+            pendingRequest.onFailure(txnError);
+            pendingRequest = null;
+        }
+
+        // Override the base class version because we do not want to propagate
+        // an error up to the client if remote close happens as that is an
+        // acceptable way for the remote to indicate the discharge could not
+        // be applied.
+
+        if (getParent() != null) {
+            getParent().removeChildResource(this);
+        }
+
+        if (getEndpoint() != null) {
+            getEndpoint().close();
+            getEndpoint().free();
+        }
+
+        LOG.debug("Transaction Coordinator link {} was remotely closed", getResourceInfo());
+    }
+
+    //----- Internal implementation ------------------------------------------//
+
+    private void sendTxCommand(Message message) throws IOException {
+        int encodedSize = 0;
+        byte[] buffer = OUTBOUND_BUFFER;
+        while (true) {
+            try {
+                encodedSize = message.encode(buffer, 0, buffer.length);
+                break;
+            } catch (BufferOverflowException e) {
+                buffer = new byte[buffer.length * 2];
+            }
+        }
+
+        Sender sender = getEndpoint();
+        sender.send(buffer, 0, encodedSize);
+        sender.advance();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
index f4c290f..cad66a3 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
@@ -37,7 +37,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Resource builder responsible for creating and opening an AmqpConnectionSession instance.
+ * Resource builder responsible for creating and opening an AmqpConnection instance.
  */
 public class AmqpConnectionBuilder extends AmqpResourceBuilder<AmqpConnection, AmqpProvider, JmsConnectionInfo, Connection> {
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpSessionBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpSessionBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpSessionBuilder.java
index d49f64c..6cb9819 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpSessionBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpSessionBuilder.java
@@ -17,7 +17,6 @@
 package org.apache.qpid.jms.provider.amqp.builders;
 
 import org.apache.qpid.jms.meta.JmsSessionInfo;
-import org.apache.qpid.jms.provider.AsyncResult;
 import org.apache.qpid.jms.provider.amqp.AmqpConnection;
 import org.apache.qpid.jms.provider.amqp.AmqpSession;
 import org.apache.qpid.proton.engine.Session;
@@ -32,35 +31,6 @@ public class AmqpSessionBuilder extends AmqpResourceBuilder<AmqpSession, AmqpCon
     }
 
     @Override
-    public void buildResource(final AsyncResult request) {
-
-        AsyncResult opened = request;
-
-        if (getResourceInfo().isTransacted()) {
-            opened = new AsyncResult() {
-
-                @Override
-                public void onSuccess() {
-                    AmqpTransactionContextBuilder builder = new AmqpTransactionContextBuilder(getResource(), getResourceInfo());
-                    builder.buildResource(request);
-                }
-
-                @Override
-                public void onFailure(Throwable result) {
-                    request.onFailure(result);
-                }
-
-                @Override
-                public boolean isComplete() {
-                    return request.isComplete();
-                }
-            };
-        }
-
-        super.buildResource(opened);
-    }
-
-    @Override
     protected Session createEndpoint(JmsSessionInfo resourceInfo) {
         long outgoingWindow = getParent().getProvider().getSessionOutgoingWindow();
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTransactionContextBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTransactionContextBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTransactionContextBuilder.java
deleted file mode 100644
index 4f9466d..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTransactionContextBuilder.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.provider.amqp.builders;
-
-import org.apache.qpid.jms.meta.JmsSessionInfo;
-import org.apache.qpid.jms.provider.amqp.AmqpSession;
-import org.apache.qpid.jms.provider.amqp.AmqpTransactionContext;
-import org.apache.qpid.proton.amqp.messaging.Source;
-import org.apache.qpid.proton.amqp.transaction.Coordinator;
-import org.apache.qpid.proton.amqp.transaction.TxnCapability;
-import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
-import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
-import org.apache.qpid.proton.engine.Sender;
-
-/**
- * Resource builder responsible for creating and opening an AmqpTemporaryDestination instance.
- */
-public class AmqpTransactionContextBuilder extends AmqpResourceBuilder<AmqpTransactionContext, AmqpSession, JmsSessionInfo, Sender> {
-
-    public AmqpTransactionContextBuilder(AmqpSession parent, JmsSessionInfo resourceInfo) {
-        super(parent, resourceInfo);
-    }
-
-    @Override
-    protected Sender createEndpoint(JmsSessionInfo resourceInfo) {
-        Coordinator coordinator = new Coordinator();
-        coordinator.setCapabilities(TxnCapability.LOCAL_TXN);
-        Source source = new Source();
-
-        String coordinatorName = "qpid-jms:coordinator:" + resourceInfo.getId().toString();
-
-        Sender sender = getParent().getEndpoint().sender(coordinatorName);
-        sender.setSource(source);
-        sender.setTarget(coordinator);
-        sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
-        sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
-
-        return sender;
-    }
-
-    @Override
-    protected AmqpTransactionContext createResource(AmqpSession parent, JmsSessionInfo resourceInfo, Sender endpoint) {
-        return new AmqpTransactionContext(parent, resourceInfo, endpoint);
-    }
-
-    @Override
-    protected boolean isClosePending() {
-        // When no link terminus was created, the peer will now detach/close us otherwise
-        // we need to validate the returned remote source prior to open completion.
-        return getEndpoint().getRemoteTarget() == null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTransactionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTransactionCoordinatorBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTransactionCoordinatorBuilder.java
new file mode 100644
index 0000000..6a01491
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTransactionCoordinatorBuilder.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.builders;
+
+import org.apache.qpid.jms.meta.JmsSessionInfo;
+import org.apache.qpid.jms.provider.amqp.AmqpTransactionContext;
+import org.apache.qpid.jms.provider.amqp.AmqpTransactionCoordinator;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.transaction.Coordinator;
+import org.apache.qpid.proton.amqp.transaction.TxnCapability;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Sender;
+
+/**
+ * Resource builder responsible for creating and opening an AmqpTransactionCoordinator instance.
+ */
+public class AmqpTransactionCoordinatorBuilder extends AmqpResourceBuilder<AmqpTransactionCoordinator, AmqpTransactionContext, JmsSessionInfo, Sender> {
+
+    public AmqpTransactionCoordinatorBuilder(AmqpTransactionContext parent, JmsSessionInfo resourceInfo) {
+        super(parent, resourceInfo);
+    }
+
+    @Override
+    protected Sender createEndpoint(JmsSessionInfo resourceInfo) {
+        Coordinator coordinator = new Coordinator();
+        coordinator.setCapabilities(TxnCapability.LOCAL_TXN);
+        Source source = new Source();
+
+        String coordinatorName = "qpid-jms:coordinator:" + resourceInfo.getId().toString();
+
+        Sender sender = getParent().getSession().getEndpoint().sender(coordinatorName);
+        sender.setSource(source);
+        sender.setTarget(coordinator);
+        sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+        sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+        return sender;
+    }
+
+    @Override
+    protected AmqpTransactionCoordinator createResource(AmqpTransactionContext parent, JmsSessionInfo resourceInfo, Sender endpoint) {
+        return new AmqpTransactionCoordinator(resourceInfo, endpoint, parent);
+    }
+
+    @Override
+    protected boolean isClosePending() {
+        // When no link terminus was created, the peer will now detach/close us otherwise
+        // we need to validate the returned remote source prior to open completion.
+        return getEndpoint().getRemoteTarget() == null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
index f493a1c..a84d260 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
@@ -61,11 +61,7 @@ import org.apache.qpid.jms.test.Wait;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
 import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
 import org.apache.qpid.jms.test.testpeer.basictypes.ConnectionError;
-import org.apache.qpid.jms.test.testpeer.describedtypes.Declare;
-import org.apache.qpid.jms.test.testpeer.describedtypes.Declared;
 import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher;
-import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
-import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
 import org.apache.qpid.jms.util.MetaDataSupport;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
@@ -120,9 +116,7 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase {
             // First expect an unsettled 'declare' transfer to the txn coordinator, and
             // reply with a declared disposition state containing the txnId.
             Binary txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
-            TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
-            declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
-            testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+            testPeer.expectDeclare(txnId);
 
             Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
             assertNotNull("Session should not be null", session);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
index f2bf986..b13fa00 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
@@ -40,7 +40,6 @@ import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Declare;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Declared;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
-import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
 import org.apache.qpid.proton.amqp.Binary;
@@ -262,8 +261,7 @@ public class QueueBrowserIntegrationTest extends QpidJmsTestCase {
             connection.start();
 
             testPeer.expectBegin();
-            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
-            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+            testPeer.expectCoordinatorAttach();
 
             // First expect an unsettled 'declare' transfer to the txn coordinator, and
             // reply with a declared disposition state containing the txnId.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 9fc563f..ef77481 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -69,7 +69,6 @@ import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.HeaderDescribedType;
 import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher;
-import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.SourceMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.TargetMatcher;
@@ -1211,8 +1210,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             connection.start();
 
             testPeer.expectBegin();
-            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
-            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+            testPeer.expectCoordinatorAttach();
 
             // First expect an unsettled 'declare' transfer to the txn coordinator, and
             // reply with a declared disposition state containing the txnId.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
index c844e1e..be73403 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
@@ -41,9 +41,6 @@ import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted;
-import org.apache.qpid.jms.test.testpeer.describedtypes.Declare;
-import org.apache.qpid.jms.test.testpeer.describedtypes.Declared;
-import org.apache.qpid.jms.test.testpeer.describedtypes.Discharge;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Error;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Modified;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected;
@@ -51,7 +48,6 @@ import org.apache.qpid.jms.test.testpeer.describedtypes.Released;
 import org.apache.qpid.jms.test.testpeer.describedtypes.TransactionalState;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
 import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher;
-import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.ReleasedMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.SourceMatcher;
@@ -59,17 +55,20 @@ import org.apache.qpid.jms.test.testpeer.matchers.TransactionalStateMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
-import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Tests for behavior of Transacted Session operations.
  */
 public class TransactionsIntegrationTest extends QpidJmsTestCase {
 
+    private static final Logger LOG = LoggerFactory.getLogger(TransactionsIntegrationTest.class);
+
     private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
 
     @Test(timeout=20000)
@@ -79,24 +78,13 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             connection.start();
 
             testPeer.expectBegin();
-            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
-            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+            testPeer.expectCoordinatorAttach();
 
-            // First expect an unsettled 'declare' transfer to the txn coordinator, and
-            // reply with a Declared disposition state containing the txnId.
             Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
-            TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
-            declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
-            testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+            testPeer.expectDeclare(txnId);
 
-            // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
-            // and reply with accepted and settled disposition to indicate the rollback succeeded.
-            Discharge discharge = new Discharge();
-            discharge.setFail(true);
-            discharge.setTxnId(txnId);
-            TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher();
-            dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
-            testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Accepted(), true);
+            // Closed session should roll-back the TX with a failed discharge
+            testPeer.expectDischarge(txnId, true);
             testPeer.expectEnd();
 
             Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
@@ -114,22 +102,19 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             connection.start();
 
             testPeer.expectBegin();
-            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
-            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+            testPeer.expectCoordinatorAttach();
 
             // First expect an unsettled 'declare' transfer to the txn coordinator, and
             // reply with a Declared disposition state containing the txnId.
             Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
-            TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
-            declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
-            testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+            testPeer.expectDeclare(txnId);
 
             Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
             Queue queue = session.createQueue("myQueue");
 
             // Create a producer to use in provoking creation of the AMQP transaction
             testPeer.expectSenderAttach();
-            MessageProducer producer  = session.createProducer(queue);
+            MessageProducer producer = session.createProducer(queue);
 
             // Expect the message which was sent under the current transaction. Check it carries
             // TransactionalState with the above txnId but has no outcome. Respond with a
@@ -152,19 +137,12 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
 
             // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
             // and reply with rejected and settled disposition to indicate the commit failed
-            Discharge discharge = new Discharge();
-            discharge.setFail(false);
-            discharge.setTxnId(txnId);
-            TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher();
-            dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
-            testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Rejected(), true);
+            testPeer.expectDischarge(txnId, false, new Rejected());
 
             // Then expect an unsettled 'declare' transfer to the txn coordinator, and
             // reply with a declared disposition state containing the txnId.
             txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
-            declareMatcher = new TransferPayloadCompositeMatcher();
-            declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
-            testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+            testPeer.expectDeclare(txnId);
 
             try {
                 session.commit();
@@ -183,22 +161,19 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             connection.start();
 
             testPeer.expectBegin();
-            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
-            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+            testPeer.expectCoordinatorAttach();
 
             // First expect an unsettled 'declare' transfer to the txn coordinator, and
             // reply with a Declared disposition state containing the txnId.
             Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
-            TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
-            declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
-            testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+            testPeer.expectDeclare(txnId);
 
             Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
             Queue queue = session.createQueue("myQueue");
 
             // Create a producer to use in provoking creation of the AMQP transaction
             testPeer.expectSenderAttach();
-            MessageProducer producer  = session.createProducer(queue);
+            MessageProducer producer = session.createProducer(queue);
 
             // Expect the message which was sent under the current transaction. Check it carries
             // TransactionalState with the above txnId but has no outcome. Respond with a
@@ -221,20 +196,13 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
 
             // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
             // and reply with rejected and settled disposition to indicate the commit failed
-            Discharge discharge = new Discharge();
-            discharge.setFail(false);
-            discharge.setTxnId(txnId);
-            TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher();
-            dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
             Rejected commitFailure = new Rejected(new Error(Symbol.valueOf("failed"), "Unknown error"));
-            testPeer.expectTransfer(dischargeMatcher, nullValue(), false, commitFailure, true);
+            testPeer.expectDischarge(txnId, false, commitFailure);
 
             // Then expect an unsettled 'declare' transfer to the txn coordinator, and
             // reply with a declared disposition state containing the txnId.
             txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
-            declareMatcher = new TransferPayloadCompositeMatcher();
-            declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
-            testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+            testPeer.expectDeclare(txnId);
 
             try {
                 session.commit();
@@ -268,22 +236,19 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             connection.start();
 
             testPeer.expectBegin();
-            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
-            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+            testPeer.expectCoordinatorAttach();
 
             // First expect an unsettled 'declare' transfer to the txn coordinator, and
             // reply with a Declared disposition state containing the txnId.
             Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
-            TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
-            declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
-            testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+            testPeer.expectDeclare(txnId);
 
             Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
             Queue queue = session.createQueue("myQueue");
 
             // Create a producer to use in provoking creation of the AMQP transaction
             testPeer.expectSenderAttach();
-            MessageProducer producer  = session.createProducer(queue);
+            MessageProducer producer = session.createProducer(queue);
 
             // Expect the message which was sent under the current transaction. Check it carries
             // TransactionalState with the above txnId but has no outcome. Respond with a
@@ -306,20 +271,13 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
 
             // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
             // and reply with rejected and settled disposition to indicate the rollback failed
-            Discharge discharge = new Discharge();
-            discharge.setFail(true);
-            discharge.setTxnId(txnId);
-            TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher();
-            dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
             Rejected commitFailure = new Rejected(new Error(Symbol.valueOf("failed"), "Unknown error"));
-            testPeer.expectTransfer(dischargeMatcher, nullValue(), false, commitFailure, true);
+            testPeer.expectDischarge(txnId, true, commitFailure);
 
             // Then expect an unsettled 'declare' transfer to the txn coordinator, and
             // reply with a declared disposition state containing the txnId.
             txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
-            declareMatcher = new TransferPayloadCompositeMatcher();
-            declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
-            testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+            testPeer.expectDeclare(txnId);
 
             try {
                 session.rollback();
@@ -362,15 +320,12 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             connection.start();
 
             testPeer.expectBegin();
-            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
-            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+            testPeer.expectCoordinatorAttach();
 
             // First expect an unsettled 'declare' transfer to the txn coordinator, and
             // reply with a declared disposition state containing the txnId.
             Binary txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
-            TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
-            declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
-            testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+            testPeer.expectDeclare(txnId);
 
             Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
             Queue queue = session.createQueue("myQueue");
@@ -398,19 +353,12 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
 
             // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
             // and reply with accepted and settled disposition to indicate the commit succeeded
-            Discharge discharge = new Discharge();
-            discharge.setFail(false);
-            discharge.setTxnId(txnId);
-            TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher();
-            dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
-            testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Accepted(), true);
+            testPeer.expectDischarge(txnId, false);
 
             // Then expect an unsettled 'declare' transfer to the txn coordinator, and
             // reply with a declared disposition state containing the txnId.
             txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
-            declareMatcher = new TransferPayloadCompositeMatcher();
-            declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
-            testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+            testPeer.expectDeclare(txnId);
 
             session.commit();
 
@@ -425,22 +373,19 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             connection.start();
 
             testPeer.expectBegin();
-            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
-            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+            testPeer.expectCoordinatorAttach();
 
             // First expect an unsettled 'declare' transfer to the txn coordinator, and
             // reply with a Declared disposition state containing the txnId.
             Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
-            TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
-            declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
-            testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+            testPeer.expectDeclare(txnId);
 
             Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
             Queue queue = session.createQueue("myQueue");
 
             // Create a producer to use in provoking creation of the AMQP transaction
             testPeer.expectSenderAttach();
-            MessageProducer producer  = session.createProducer(queue);
+            MessageProducer producer = session.createProducer(queue);
 
             // Expect the message which was sent under the current transaction. Check it carries
             // TransactionalState with the above txnId but has no outcome. Respond with a
@@ -481,15 +426,12 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             connection.start();
 
             testPeer.expectBegin();
-            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
-            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+            testPeer.expectCoordinatorAttach();
 
             // First expect an unsettled 'declare' transfer to the txn coordinator, and
             // reply with a declared disposition state containing the txnId.
             Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
-            TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
-            declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
-            testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+            testPeer.expectDeclare(txnId);
 
             Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
             Queue queue = session.createQueue("myQueue");
@@ -520,19 +462,12 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
 
             // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
             // and reply with accepted and settled disposition to indicate the rollback succeeded
-            Discharge discharge = new Discharge();
-            discharge.setFail(true);
-            discharge.setTxnId(txnId);
-            TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher();
-            dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
-            testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Accepted(), true);
+            testPeer.expectDischarge(txnId, true);
 
             // Then expect an unsettled 'declare' transfer to the txn coordinator, and
             // reply with a declared disposition state containing the txnId.
             txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
-            declareMatcher = new TransferPayloadCompositeMatcher();
-            declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
-            testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+            testPeer.expectDeclare(txnId);
 
             // Expect the messages that were not consumed to be released
             int unconsumed = transferCount - consumeCount;
@@ -558,15 +493,12 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             connection.start();
 
             testPeer.expectBegin();
-            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
-            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+            testPeer.expectCoordinatorAttach();
 
             // First expect an unsettled 'declare' transfer to the txn coordinator, and
             // reply with a declared disposition state containing the txnId.
             Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
-            TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
-            declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
-            testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+            testPeer.expectDeclare(txnId);
 
             Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
             Queue queue = session.createQueue("myQueue");
@@ -579,7 +511,7 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
 
             // Create a producer to use in provoking creation of the AMQP transaction
             testPeer.expectSenderAttach();
-            MessageProducer producer  = session.createProducer(queue);
+            MessageProducer producer = session.createProducer(queue);
 
             // Expect the message which provoked creating the transaction
             TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
@@ -602,19 +534,12 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
 
             // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
             // and reply with accepted and settled disposition to indicate the rollback succeeded
-            Discharge discharge = new Discharge();
-            discharge.setFail(true);
-            discharge.setTxnId(txnId);
-            TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher();
-            dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
-            testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Accepted(), true);
+            testPeer.expectDischarge(txnId, true);
 
             // Now expect an unsettled 'declare' transfer to the txn coordinator, and
             // reply with a declared disposition state containing the txnId.
             txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
-            declareMatcher = new TransferPayloadCompositeMatcher();
-            declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
-            testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+            testPeer.expectDeclare(txnId);
 
             // Expect the messages that were not consumed to be released
             for (int i = 1; i <= messageCount; i++) {
@@ -639,15 +564,12 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             connection.start();
 
             testPeer.expectBegin();
-            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
-            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+            testPeer.expectCoordinatorAttach();
 
             // First expect an unsettled 'declare' transfer to the txn coordinator, and
             // reply with a declared disposition state containing the txnId.
             Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
-            TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
-            declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
-            testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+            testPeer.expectDeclare(txnId);
 
             Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
             Queue queue = session.createQueue("myQueue");
@@ -660,7 +582,7 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
 
             // Create a producer to use in provoking creation of the AMQP transaction
             testPeer.expectSenderAttach();
-            MessageProducer producer  = session.createProducer(queue);
+            MessageProducer producer = session.createProducer(queue);
 
             // Expect the message which provoked creating the transaction
             TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
@@ -688,19 +610,12 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
 
             // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
             // and reply with accepted and settled disposition to indicate the rollback succeeded
-            Discharge discharge = new Discharge();
-            discharge.setFail(true);
-            discharge.setTxnId(txnId);
-            TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher();
-            dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
-            testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Accepted(), true);
+            testPeer.expectDischarge(txnId, true);
 
             // Then expect an unsettled 'declare' transfer to the txn coordinator, and
             // reply with a declared disposition state containing the txnId.
             txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
-            declareMatcher = new TransferPayloadCompositeMatcher();
-            declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
-            testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+            testPeer.expectDeclare(txnId);
 
             // Expect the messages that were not consumed to be released
             for (int i = 1; i <= messageCount; i++) {
@@ -723,15 +638,12 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             connection.start();
 
             testPeer.expectBegin();
-            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
-            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+            testPeer.expectCoordinatorAttach();
 
             // First expect an unsettled 'declare' transfer to the txn coordinator, and
             // reply with a declared disposition state containing the txnId.
             Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
-            TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
-            declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
-            testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+            testPeer.expectDeclare(txnId);
 
             Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
             String queueName = "myQueue";
@@ -752,4 +664,104 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
+
+    @Test(timeout=20000)
+    public void testRollbackErrorCoordinatorClosedOnCommit() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+            testPeer.expectCoordinatorAttach();
+
+            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+            testPeer.expectDeclare(txnId);
+            testPeer.remotelyCloseLastCoordinatorLinkOnDischarge(txnId, false);
+            testPeer.expectCoordinatorAttach();
+            testPeer.expectDeclare(txnId);
+
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+            try {
+                session.commit();
+                fail("Transaction should have rolled back");
+            } catch (TransactionRolledBackException ex) {
+                LOG.info("Caught expected TransactionRolledBackException");
+            }
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout=20000)
+    public void testJMSErrorCoordinatorClosedOnRollback() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+            testPeer.expectCoordinatorAttach();
+
+            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+            testPeer.expectDeclare(txnId);
+            testPeer.remotelyCloseLastCoordinatorLinkOnDischarge(txnId, true);
+            testPeer.expectCoordinatorAttach();
+            testPeer.expectDeclare(txnId);
+
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+            try {
+                session.rollback();
+                fail("Transaction should have rolled back");
+            } catch (JMSException ex) {
+                LOG.info("Caught expected JMSException");
+            }
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test // (timeout=20000)
+    public void testSendAfterCoordinatorLinkClosedDuringTX() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+            testPeer.expectCoordinatorAttach();
+
+            // First expect an unsettled 'declare' transfer to the txn coordinator, and
+            // reply with a Declared disposition state containing the txnId.
+            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+            testPeer.expectDeclare(txnId);
+
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue("myQueue");
+
+            // Create a producer to use in provoking creation of the AMQP transaction
+            testPeer.expectSenderAttach();
+
+            // Close the link, the messages should now just get dropped on the floor.
+            testPeer.remotelyCloseLastCoordinatorLink();
+
+            MessageProducer producer = session.createProducer(queue);
+
+            testPeer.waitForAllHandlersToComplete(2000);
+
+            producer.send(session.createMessage());
+
+            // Expect that a new link will be created in order to start the next TX.
+            txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+            testPeer.expectCoordinatorAttach();
+            testPeer.expectDeclare(txnId);
+
+            try {
+                session.commit();
+                fail("Commit operation should have failed.");
+            } catch (TransactionRolledBackException jmsTxRb) {
+            }
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org