You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/12/07 20:22:03 UTC

qpid-jms git commit: QPIDJMS-207 Clean up resource close handling

Repository: qpid-jms
Updated Branches:
  refs/heads/master 4c5b08d15 -> 88a77984d


QPIDJMS-207 Clean up resource close handling

In some cases timeouts, connection drops, or close of a resource can
leave sync requests or async completions blocked or unanswered.  Clean
up the close handling in the AMQP layer to give child resources the
chance to unpark or signal failure to pending operations.


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/88a77984
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/88a77984
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/88a77984

Branch: refs/heads/master
Commit: 88a77984d8ad17ba926dc184e50ba128f218c61a
Parents: 4c5b08d
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Dec 6 14:10:18 2016 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Dec 7 12:47:57 2016 -0500

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsConnection.java |  7 +-
 .../apache/qpid/jms/JmsConnectionListener.java  |  6 +-
 .../org/apache/qpid/jms/JmsMessageConsumer.java |  8 +--
 .../org/apache/qpid/jms/JmsMessageProducer.java |  8 +--
 .../java/org/apache/qpid/jms/JmsSession.java    | 23 +++---
 .../apache/qpid/jms/meta/JmsConnectionInfo.java |  2 +-
 .../jms/provider/DefaultProviderListener.java   |  2 +-
 .../qpid/jms/provider/ProviderListener.java     |  2 +-
 .../qpid/jms/provider/ProviderWrapper.java      |  2 +-
 .../jms/provider/amqp/AmqpAbstractResource.java | 76 +++++++++-----------
 .../qpid/jms/provider/amqp/AmqpConnection.java  | 17 +++++
 .../provider/amqp/AmqpConnectionSession.java    | 16 ++++-
 .../qpid/jms/provider/amqp/AmqpConsumer.java    | 17 +++--
 .../jms/provider/amqp/AmqpFixedProducer.java    |  2 +-
 .../qpid/jms/provider/amqp/AmqpProvider.java    | 10 +--
 .../qpid/jms/provider/amqp/AmqpSession.java     |  6 +-
 .../amqp/AmqpTransactionCoordinator.java        |  6 +-
 .../qpid/jms/JmsDefaultConnectionListener.java  |  6 +-
 .../integration/ConsumerIntegrationTest.java    |  5 +-
 .../integration/ProducerIntegrationTest.java    | 75 ++++++++++++++++++-
 .../jms/integration/SessionIntegrationTest.java |  6 +-
 .../SubscriptionsIntegrationTest.java           |  6 +-
 .../TransactionsIntegrationTest.java            | 13 ++--
 .../JmsConsumerPriorityDispatchTest.java        | 12 ++--
 .../jms/discovery/FileWatcherDiscoveryTest.java |  6 +-
 .../jms/discovery/JmsAmqpDiscoveryTest.java     |  6 +-
 .../transactions/JmsTransactedConsumerTest.java |  6 +-
 27 files changed, 223 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/88a77984/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index e5ecd72..0837d78 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -179,9 +179,6 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
                     session.shutdown();
                 }
 
-                sessions.clear();
-                tempDestinations.clear();
-
                 if (isConnected() && !failed.get()) {
                     ProviderFuture request = new ProviderFuture();
                     requests.put(request, request);
@@ -205,6 +202,8 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
                     }
                 }
 
+                sessions.clear();
+                tempDestinations.clear();
                 connected.set(false);
                 started.set(false);
                 closing.set(false);
@@ -1268,7 +1267,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
     }
 
     @Override
-    public void onResourceClosed(final JmsResource resource, final Exception cause) {
+    public void onResourceClosed(final JmsResource resource, final Throwable cause) {
         // Closure of the Connection itself is notified via onConnectionFailure
 
         // Run on the connection executor to free the provider to go do more work and avoid

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/88a77984/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java
index 7aecc44..f9389f4 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java
@@ -82,7 +82,7 @@ public interface JmsConnectionListener {
      * @param cause
      *      The exception that provides additional context on the closure.
      */
-    void onSessionClosed(Session session, Exception cause);
+    void onSessionClosed(Session session, Throwable cause);
 
     /**
      * Called when the MessageConsumer is closed due to remote action or local error detection.
@@ -92,7 +92,7 @@ public interface JmsConnectionListener {
      * @param cause
      *      The exception that provides additional context on the closure.
      */
-    void onConsumerClosed(MessageConsumer consumer, Exception cause);
+    void onConsumerClosed(MessageConsumer consumer, Throwable cause);
 
     /**
      * Called when the MessageProducer is closed due to remote action or local error detection.
@@ -102,6 +102,6 @@ public interface JmsConnectionListener {
      * @param cause
      *      The exception that provides additional context on the closure.
      */
-    void onProducerClosed(MessageProducer producer, Exception cause);
+    void onProducerClosed(MessageProducer producer, Throwable cause);
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/88a77984/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index 9eb332d..a76398f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -65,7 +65,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
     protected final MessageQueue messageQueue;
     protected final Lock lock = new ReentrantLock();
     protected final AtomicBoolean suspendedConnection = new AtomicBoolean();
-    protected final AtomicReference<Exception> failureCause = new AtomicReference<>();
+    protected final AtomicReference<Throwable> failureCause = new AtomicReference<>();
 
     protected JmsMessageConsumer(JmsConsumerId consumerId, JmsSession session, JmsDestination destination,
                                  String selector, boolean noLocal) throws JMSException {
@@ -183,7 +183,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
         shutdown(null);
     }
 
-    protected void shutdown(Exception cause) throws JMSException {
+    protected void shutdown(Throwable cause) throws JMSException {
         if (closed.compareAndSet(false, true)) {
             setFailureCause(cause);
             session.remove(this);
@@ -392,11 +392,11 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
         }
     }
 
-    void setFailureCause(Exception failureCause) {
+    void setFailureCause(Throwable failureCause) {
         this.failureCause.set(failureCause);
     }
 
-    Exception getFailureCause() {
+    Throwable getFailureCause() {
         if (failureCause.get() == null) {
             return session.getFailureCause();
         }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/88a77984/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
index 1058225..c61dad0 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
@@ -52,7 +52,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
     protected boolean disableMessageId;
     protected boolean disableTimestamp;
     protected final AtomicLong messageSequence = new AtomicLong();
-    protected final AtomicReference<Exception> failureCause = new AtomicReference<>();
+    protected final AtomicReference<Throwable> failureCause = new AtomicReference<>();
 
     protected JmsMessageProducer(JmsProducerId producerId, JmsSession session, JmsDestination destination) throws JMSException {
         this.session = session;
@@ -105,7 +105,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
         shutdown(null);
     }
 
-    protected void shutdown(Exception cause) throws JMSException {
+    protected void shutdown(Throwable cause) throws JMSException {
         if (closed.compareAndSet(false, true)) {
             failureCause.set(cause);
             session.remove(this);
@@ -329,11 +329,11 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
         return producerInfo.getMessageIDBuilder();
     }
 
-    void setFailureCause(Exception failureCause) {
+    void setFailureCause(Throwable failureCause) {
         this.failureCause.set(failureCause);
     }
 
-    Exception getFailureCause() {
+    Throwable getFailureCause() {
         if (failureCause.get() == null) {
             return session.getFailureCause();
         }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/88a77984/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 0202d42..6ad8729 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -115,7 +115,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
     private final AtomicLong producerIdGenerator = new AtomicLong();
     private JmsTransactionContext transactionContext;
     private boolean sessionRecovered;
-    private final AtomicReference<Exception> failureCause = new AtomicReference<Exception>();
+    private final AtomicReference<Throwable> failureCause = new AtomicReference<>();
     private final Deque<SendCompletion> asyncSendQueue = new ConcurrentLinkedDeque<SendCompletion>();
 
     protected JmsSession(JmsConnection connection, JmsSessionId sessionId, int acknowledgementMode) throws JMSException {
@@ -253,8 +253,8 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
     protected void doClose() throws JMSException {
         boolean interrupted = Thread.interrupted();
         shutdown();
-        connection.removeSession(sessionInfo);
         connection.destroyResource(sessionInfo);
+        connection.removeSession(sessionInfo);
         if (interrupted) {
             Thread.currentThread().interrupt();
         }
@@ -275,7 +275,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         shutdown(null);
     }
 
-    protected void shutdown(Exception cause) throws JMSException {
+    protected void shutdown(Throwable cause) throws JMSException {
         if (closed.compareAndSet(false, true)) {
             setFailureCause(cause);
             stop();
@@ -289,8 +289,14 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
 
             transactionContext.shutdown();
 
+            // Ensure that no asynchronous completion sends remain blocked after close.
             synchronized (sessionInfo) {
                 if (completionExcecutor != null) {
+                    if (cause == null) {
+                        cause = new JMSException("Session closed remotely before message transfer result was notified");
+                    }
+
+                    completionExcecutor.execute(new FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)));
                     completionExcecutor.shutdown();
                     try {
                         completionExcecutor.awaitTermination(connection.getCloseTimeout(), TimeUnit.MILLISECONDS);
@@ -305,16 +311,15 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
 
     //----- Events fired when resource remotely closed due to some error -----//
 
-    void sessionClosed(Exception cause) {
+    void sessionClosed(Throwable cause) {
         try {
-            getCompletionExecutor().execute(new FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)));
             shutdown(cause);
         } catch (Throwable error) {
             LOG.trace("Ignoring exception thrown during cleanup of closed session", error);
         }
     }
 
-    JmsMessageConsumer consumerClosed(JmsConsumerInfo resource, Exception cause) {
+    JmsMessageConsumer consumerClosed(JmsConsumerInfo resource, Throwable cause) {
         LOG.info("A JMS MessageConsumer has been closed: {}", resource);
 
         JmsMessageConsumer consumer = consumers.get(resource.getId());
@@ -330,7 +335,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         return consumer;
     }
 
-    JmsMessageProducer producerClosed(JmsProducerInfo resource, Exception cause) {
+    JmsMessageProducer producerClosed(JmsProducerInfo resource, Throwable cause) {
         LOG.info("A JMS MessageProducer has been closed: {}", resource);
 
         JmsMessageProducer producer = producers.get(resource.getId());
@@ -1059,11 +1064,11 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         return new JmsProducerId(sessionInfo.getId(), producerIdGenerator.incrementAndGet());
     }
 
-    void setFailureCause(Exception failureCause) {
+    void setFailureCause(Throwable failureCause) {
         this.failureCause.set(failureCause);
     }
 
-    Exception getFailureCause() {
+    Throwable getFailureCause() {
         return failureCause.get();
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/88a77984/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
index 0d78137..412abd4 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
@@ -38,7 +38,7 @@ public final class JmsConnectionInfo implements JmsResource, Comparable<JmsConne
 
     public static final long INFINITE = -1;
     public static final long DEFAULT_CONNECT_TIMEOUT = 15000;
-    public static final long DEFAULT_CLOSE_TIMEOUT = 30000;
+    public static final long DEFAULT_CLOSE_TIMEOUT = 60000;
     public static final long DEFAULT_SEND_TIMEOUT = INFINITE;
     public static final long DEFAULT_REQUEST_TIMEOUT = INFINITE;
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/88a77984/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java
index d2eb95c..1d98e01 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java
@@ -65,7 +65,7 @@ public class DefaultProviderListener implements ProviderListener {
     }
 
     @Override
-    public void onResourceClosed(JmsResource resource, Exception cause) {
+    public void onResourceClosed(JmsResource resource, Throwable cause) {
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/88a77984/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java
index 11a5f6b..a96de4f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java
@@ -144,7 +144,7 @@ public interface ProviderListener {
      * @param cause
      *        optional exception object that indicates the cause of the close.
      */
-    void onResourceClosed(JmsResource resource, Exception cause);
+    void onResourceClosed(JmsResource resource, Throwable cause);
 
     /**
      * Called to indicate that a some client operation caused or received an

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/88a77984/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
index 3a3d383..65d4447 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
@@ -194,7 +194,7 @@ public class ProviderWrapper<E extends Provider> implements Provider, ProviderLi
     }
 
     @Override
-    public void onResourceClosed(JmsResource resource, Exception cause) {
+    public void onResourceClosed(JmsResource resource, Throwable cause) {
         listener.onResourceClosed(resource, cause);
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/88a77984/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
index dbdb53f..d858d40 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
@@ -99,6 +99,8 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
             return;
         }
 
+        LOG.trace("{} requesting close on remote.", this);
+
         closeRequest = request;
 
         // Use close timeout for all resource closures and fallback to the request
@@ -119,13 +121,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
 
                     @Override
                     public void onFailure(Throwable result) {
-                        closeRequest.onFailure(result);
-                        closeRequest = null;
-
-                        // This ensures that the resource gets properly cleaned
-                        // up, the request will have already completed so there
-                        // won't be multiple events fired.
-                        resourceClosed();
+                        closeResource(getParent().getProvider(), result, false);
                     }
 
                     @Override
@@ -139,45 +135,49 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
         closeOrDetachEndpoint();
     }
 
-    public void resourceClosed() {
-        getEndpoint().close();
-        getEndpoint().free();
-        getEndpoint().setContext(null);
-
-        if (closeRequest != null) {
-            closeRequest.onSuccess();
-            closeRequest = null;
-        }
-    }
-
-    public void remotelyClosed(AmqpProvider provider) {
-        locallyClosed(provider, AmqpSupport.convertToException(getEndpoint(), getEndpoint().getRemoteCondition()));
-    }
-
-    public void locallyClosed(AmqpProvider provider, Exception error) {
+    public void closeResource(AmqpProvider provider, Throwable cause, boolean remotelyClosed) {
         if (parent != null) {
             parent.removeChildResource(this);
         }
 
-        if (endpoint != null) {
+        if (getEndpoint() != null) {
             // TODO: if this is a producer/consumer link then we may only be detached,
             // rather than fully closed, and should respond appropriately.
             closeOrDetachEndpoint();
-        }
 
-        // Process the close before moving on to closing down child resources
-        provider.pumpToProtonTransport();
+            // Process the close before moving on to closing down child resources
+            provider.pumpToProtonTransport();
+
+            handleResourceClosure(provider, cause);
 
-        handleResourceClosure(provider, error);
+            // Now clean up after the close has completed if the close is not initiated
+            // from this client, otherwise we need to wait for the remote to respond.
+            if (remotelyClosed) {
+                getEndpoint().free();
+                getEndpoint().setContext(null);
+            }
+        }
 
-        if (getResourceInfo() instanceof JmsConnectionInfo) {
-            provider.fireProviderException(error);
+        if (isAwaitingClose()) {
+            LOG.debug("{} is now closed: ", this);
+            if (cause == null) {
+                closeRequest.onSuccess();
+            } else {
+                closeRequest.onFailure(cause);
+            }
+            closeRequest = null;
         } else {
-            provider.fireResourceClosed(getResourceInfo(), error);
+            if (cause != null) {
+                if (getResourceInfo() instanceof JmsConnectionInfo) {
+                    provider.fireProviderException(cause);
+                } else {
+                    provider.fireResourceClosed(getResourceInfo(), cause);
+                }
+            }
         }
     }
 
-    public void handleResourceClosure(AmqpProvider provider, Exception error) {
+    public void handleResourceClosure(AmqpProvider provider, Throwable error) {
         // Nothing do be done here, subclasses can override as needed.
     }
 
@@ -241,21 +241,15 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
 
     @Override
     public void processRemoteDetach(AmqpProvider provider) throws IOException {
-        if (isAwaitingClose()) {
-            LOG.debug("{} is now closed: ", this);
-            resourceClosed();
-        } else {
-            remotelyClosed(provider);
-        }
+        processRemoteClose(provider);
     }
 
     @Override
     public void processRemoteClose(AmqpProvider provider) throws IOException {
         if (isAwaitingClose()) {
-            LOG.debug("{} is now closed: ", this);
-            resourceClosed();
+            closeResource(provider, null, true); // Close was expected so ignore any endpoint errors.
         } else {
-            remotelyClosed(provider);
+            closeResource(provider, AmqpSupport.convertToException(getEndpoint(), getEndpoint().getRemoteCondition()), true);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/88a77984/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
index e7c9b62..e8eebb0 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
@@ -17,7 +17,9 @@
 package org.apache.qpid.jms.provider.amqp;
 
 import java.net.URI;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -117,6 +119,21 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn
         }
     }
 
+    @Override
+    public void handleResourceClosure(AmqpProvider provider, Throwable cause) {
+        connectionSession.handleResourceClosure(getProvider(), cause);
+
+        List<AmqpSession> sessionList = new ArrayList<>(sessions.values());
+        for (AmqpSession session : sessionList) {
+            session.handleResourceClosure(provider, cause);
+        }
+
+        List<AmqpTemporaryDestination> tempDestsList = new ArrayList<>(tempDests.values());
+        for (AmqpTemporaryDestination tempDest : tempDestsList) {
+            tempDest.handleResourceClosure(provider, cause);
+        }
+    }
+
     public URI getRemoteURI() {
         return remoteURI;
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/88a77984/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
index 7ca27aa..3f25164 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
@@ -16,7 +16,9 @@
  */
 package org.apache.qpid.jms.provider.amqp;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import javax.jms.InvalidDestinationException;
@@ -80,6 +82,14 @@ public class AmqpConnectionSession extends AmqpSession {
         builder.buildResource(subscribeRequest);
     }
 
+    @Override
+    public void handleResourceClosure(AmqpProvider provider, Throwable cause) {
+        List<AsyncResult> pending = new ArrayList<>(pendingUnsubs.values());
+        for (AsyncResult unsubscribeRequest : pending) {
+            unsubscribeRequest.onFailure(cause);
+        }
+    }
+
     private static final class DurableSubscriptionReattach extends AmqpAbstractResource<JmsSessionInfo, Receiver> {
 
         public DurableSubscriptionReattach(JmsSessionInfo resource, Receiver receiver, AmqpResourceParent parent) {
@@ -149,12 +159,12 @@ public class AmqpConnectionSession extends AmqpSession {
         }
 
         @Override
-        public void onFailure(Throwable result) {
+        public void onFailure(Throwable cause) {
             DurableSubscriptionReattach subscriber = subscriberBuilder.getResource();
             LOG.trace("Failed to reattach to subscription '{}' using link name '{}'", subscriptionName, subscriber.getLinkName());
             pendingUnsubs.remove(subscriptionName);
-            subscriber.resourceClosed();
-            super.onFailure(result);
+            subscriber.closeResource(getProvider(), cause, false);
+            super.onFailure(cause);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/88a77984/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 02d9b47..fb818f4 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -126,8 +126,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
                     public void run() {
                         LOG.trace("Consumer {} drain request timed out", getConsumerId());
                         Exception cause = new JmsOperationTimedOutException("Remote did not respond to a drain request in time");
-                        locallyClosed(session.getProvider(), cause);
-                        stopRequest.onFailure(cause);
+                        closeResource(session.getProvider(), cause, false);
                         session.getProvider().pumpToProtonTransport();
                     }
                 }, getDrainTimeout());
@@ -566,7 +565,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
     }
 
     @Override
-    public void handleResourceClosure(AmqpProvider provider, Exception error) {
+    public void handleResourceClosure(AmqpProvider provider, Throwable cause) {
         AmqpConnection connection = session.getConnection();
         AmqpSubscriptionTracker subTracker = connection.getSubTracker();
         JmsConsumerInfo consumerInfo = getResourceInfo();
@@ -576,12 +575,20 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
         // When closed we need to release any pending tasks to avoid blocking
 
         if (stopRequest != null) {
-            stopRequest.onSuccess();
+            if (cause == null) {
+                stopRequest.onSuccess();
+            } else {
+                stopRequest.onFailure(cause);
+            }
             stopRequest = null;
         }
 
         if (pullRequest != null) {
-            pullRequest.onSuccess();
+            if (cause == null) {
+                pullRequest.onSuccess();
+            } else {
+                pullRequest.onFailure(cause);
+            }
             pullRequest = null;
         }
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/88a77984/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index 324fdb9..9b5bc71 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -278,7 +278,7 @@ public class AmqpFixedProducer extends AmqpProducer {
     }
 
     @Override
-    public void handleResourceClosure(AmqpProvider provider, Exception error) {
+    public void handleResourceClosure(AmqpProvider provider, Throwable error) {
         if (error == null) {
             // TODO: create/use a more specific/appropriate exception type?
             error = new JMSException("Producer closed remotely before message transfer result was notified");

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/88a77984/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index b0fa936..832aa0d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -761,9 +761,6 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
                         // We can't send any more output, so close the transport
                         protonTransport.close_head();
                         fireProviderException(error);
-                        if (connection != null) {
-                            connection.resourceClosed();
-                        }
                     }
                 }
             });
@@ -787,9 +784,6 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
                         // We can't send any more output, so close the transport
                         protonTransport.close_head();
                         fireProviderException(new IOException("Transport connection remotely closed."));
-                        if (connection != null) {
-                            connection.resourceClosed();
-                        }
                     }
                 }
             });
@@ -967,10 +961,10 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
         }
     }
 
-    void fireResourceClosed(JmsResource resource, Exception ex) {
+    void fireResourceClosed(JmsResource resource, Throwable cause) {
         ProviderListener listener = this.listener;
         if (listener != null) {
-            listener.onResourceClosed(resource, ex);
+            listener.onResourceClosed(resource, cause);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/88a77984/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
index 5f441e9..49b1d7d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
@@ -228,15 +228,15 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> i
     }
 
     @Override
-    public void handleResourceClosure(AmqpProvider provider, Exception error) {
+    public void handleResourceClosure(AmqpProvider provider, Throwable error) {
         List<AmqpConsumer> consumerList = new ArrayList<>(consumers.values());
         for (AmqpConsumer consumer : consumerList) {
-            consumer.locallyClosed(provider, error);
+            consumer.handleResourceClosure(provider, error);
         }
 
         List<AmqpProducer> producerList = new ArrayList<>(producers.values());
         for (AmqpProducer producer : producerList) {
-            producer.locallyClosed(provider, error);
+            producer.handleResourceClosure(provider, error);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/88a77984/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
index d18e95b..f567b09 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
@@ -173,14 +173,12 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<JmsSessionI
     //----- Base class overrides ---------------------------------------------//
 
     @Override
-    public void remotelyClosed(AmqpProvider provider) {
-
-        Exception txnError = AmqpSupport.convertToException(getEndpoint(), getEndpoint().getRemoteCondition());
+    public void closeResource(AmqpProvider provider, Throwable cause, boolean localClose) {
 
         // Alert any pending operation that the link failed to complete the pending
         // begin / commit / rollback operation.
         if (pendingRequest != null) {
-            pendingRequest.onFailure(txnError);
+            pendingRequest.onFailure(cause);
             pendingRequest = null;
         }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/88a77984/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java
index 9186eba..c1b8b9e 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java
@@ -49,14 +49,14 @@ public class JmsDefaultConnectionListener implements JmsConnectionListener {
     }
 
     @Override
-    public void onSessionClosed(Session session, Exception exception) {
+    public void onSessionClosed(Session session, Throwable exception) {
     }
 
     @Override
-    public void onConsumerClosed(MessageConsumer consumer, Exception cause) {
+    public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {
     }
 
     @Override
-    public void onProducerClosed(MessageProducer producer, Exception cause) {
+    public void onProducerClosed(MessageProducer producer, Throwable cause) {
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/88a77984/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index ed0cc70..6c667ec 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -88,6 +88,9 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
             testPeer.expectDetach(true, true, true);
             consumer.close();
 
+            testPeer.expectClose();
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -130,7 +133,7 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
             JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
             connection.addConnectionListener(new JmsDefaultConnectionListener() {
                 @Override
-                public void onConsumerClosed(MessageConsumer consumer, Exception exception) {
+                public void onConsumerClosed(MessageConsumer consumer, Throwable exception) {
                     consumerClosed.set(true);
                 }
             });

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/88a77984/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index a92eb67..36e6422 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -917,7 +917,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
             connection.addConnectionListener(new JmsDefaultConnectionListener() {
                 @Override
-                public void onProducerClosed(MessageProducer producer, Exception exception) {
+                public void onProducerClosed(MessageProducer producer, Throwable exception) {
                     producerClosed.countDown();
                 }
             });
@@ -1056,7 +1056,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
             connection.addConnectionListener(new JmsDefaultConnectionListener() {
                 @Override
-                public void onProducerClosed(MessageProducer producer, Exception exception) {
+                public void onProducerClosed(MessageProducer producer, Throwable exception) {
                     producerClosed.countDown();
                 }
             });
@@ -2070,6 +2070,77 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
     }
 
     @Test(timeout = 20000)
+    public void testAsyncCompletionGetsNotifiedWhenSessionClosed() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            MessageProducer producer = session.createProducer(queue);
+
+            // Create and transfer a new message
+            String text = "myMessage";
+            testPeer.expectTransferButDoNotRespond(new TransferPayloadCompositeMatcher());
+            testPeer.expectEnd();
+            testPeer.expectClose();
+
+            TextMessage message = session.createTextMessage(text);
+            TestJmsCompletionListener listener = new TestJmsCompletionListener();
+
+            producer.send(message, listener);
+
+            session.close();
+
+            assertTrue("Did not get async callback", listener.awaitCompletion(5, TimeUnit.SECONDS));
+            assertNotNull(listener.exception);
+            assertNotNull(listener.message);
+            assertTrue(listener.message instanceof TextMessage);
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testAsyncCompletionGetsNotifiedWhenConnectionClosed() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            MessageProducer producer = session.createProducer(queue);
+
+            // Create and transfer a new message
+            String text = "myMessage";
+            testPeer.expectTransferButDoNotRespond(new TransferPayloadCompositeMatcher());
+            testPeer.expectClose();
+
+            TextMessage message = session.createTextMessage(text);
+            TestJmsCompletionListener listener = new TestJmsCompletionListener();
+
+            producer.send(message, listener);
+
+            connection.close();
+
+            assertTrue("Did not get async callback", listener.awaitCompletion(5, TimeUnit.SECONDS));
+            assertNotNull(listener.exception);
+            assertNotNull(listener.message);
+            assertTrue(listener.message instanceof TextMessage);
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
+
+    @Test(timeout = 20000)
     public void testAsyncCompletionResetsBytesMessage() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
             Connection connection = testFixture.establishConnecton(testPeer);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/88a77984/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index ec03d5b..85e0261 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -1645,7 +1645,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
             connection.addConnectionListener(new JmsDefaultConnectionListener() {
                 @Override
-                public void onSessionClosed(Session session, Exception exception) {
+                public void onSessionClosed(Session session, Throwable exception) {
                     sessionClosed.countDown();
                 }
             });
@@ -1772,7 +1772,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
             connection.addConnectionListener(new JmsDefaultConnectionListener() {
                 @Override
-                public void onSessionClosed(Session session, Exception exception) {
+                public void onSessionClosed(Session session, Throwable exception) {
                     sessionClosed.countDown();
                 }
             });
@@ -1842,7 +1842,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
             connection.addConnectionListener(new JmsDefaultConnectionListener() {
                 @Override
-                public void onSessionClosed(Session session, Exception exception) {
+                public void onSessionClosed(Session session, Throwable exception) {
                     sessionClosed.countDown();
                 }
             });

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/88a77984/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SubscriptionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SubscriptionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SubscriptionsIntegrationTest.java
index 6877a20..2fa2b9d 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SubscriptionsIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SubscriptionsIntegrationTest.java
@@ -1116,7 +1116,7 @@ public class SubscriptionsIntegrationTest extends QpidJmsTestCase {
             final CountDownLatch subscriberClosed = new CountDownLatch(1);
             ((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() {
                 @Override
-                public void onConsumerClosed(MessageConsumer consumer, Exception exception) {
+                public void onConsumerClosed(MessageConsumer consumer, Throwable exception) {
                     subscriberClosed.countDown();
                 }
             });
@@ -1368,7 +1368,7 @@ public class SubscriptionsIntegrationTest extends QpidJmsTestCase {
             final CountDownLatch sessionClosed = new CountDownLatch(1);
             ((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() {
                 @Override
-                public void onSessionClosed(Session session, Exception exception) {
+                public void onSessionClosed(Session session, Throwable exception) {
                     sessionClosed.countDown();
                 }
             });
@@ -1450,7 +1450,7 @@ public class SubscriptionsIntegrationTest extends QpidJmsTestCase {
             final CountDownLatch sessionClosed = new CountDownLatch(1);
             ((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() {
                 @Override
-                public void onSessionClosed(Session session, Exception exception) {
+                public void onSessionClosed(Session session, Throwable exception) {
                     sessionClosed.countDown();
                 }
             });

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/88a77984/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
index 33ce7d3..e91229b 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
@@ -1285,7 +1285,7 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             Queue queue = session.createQueue("myQueue");
 
             testPeer.expectReceiverAttach();
-            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 2);
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 1);
 
             // Then expect a *settled* TransactionalState disposition for the message once received by the consumer
             TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
@@ -1311,6 +1311,9 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             // and reply with accepted and settled disposition to indicate the rollback succeeded
             testPeer.expectDischarge(txnId, true);
 
+            // Expect the release of the prefetched message from the consumer that was closed.
+            //testPeer.expectDispositionThatIsReleasedAndSettled();
+
             // Then expect an unsettled 'declare' transfer to the txn coordinator, and
             // reply with a declared disposition state containing the txnId.
             txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
@@ -1318,13 +1321,7 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             testPeer.expectDischarge(txnId, true);
             testPeer.expectClose();
 
-            try {
-                session.rollback();
-                //fail("Consumer should have failed to stop and caused an error on rollback.");
-            } catch (JMSException ex) {
-                // Expected
-            }
-
+            session.rollback();
             connection.close();
 
             testPeer.waitForAllHandlersToComplete(1000);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/88a77984/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsConsumerPriorityDispatchTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsConsumerPriorityDispatchTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsConsumerPriorityDispatchTest.java
index e076455..ff00260 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsConsumerPriorityDispatchTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsConsumerPriorityDispatchTest.java
@@ -77,15 +77,15 @@ public class JmsConsumerPriorityDispatchTest extends AmqpTestSupport {
             }
 
             @Override
-            public void onSessionClosed(Session session, Exception exception) {
+            public void onSessionClosed(Session session, Throwable exception) {
             }
 
             @Override
-            public void onConsumerClosed(MessageConsumer consumer, Exception cause) {
+            public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {
             }
 
             @Override
-            public void onProducerClosed(MessageProducer producer, Exception cause) {
+            public void onProducerClosed(MessageProducer producer, Throwable cause) {
             }
         });
 
@@ -145,15 +145,15 @@ public class JmsConsumerPriorityDispatchTest extends AmqpTestSupport {
             }
 
             @Override
-            public void onSessionClosed(Session session, Exception exception) {
+            public void onSessionClosed(Session session, Throwable exception) {
             }
 
             @Override
-            public void onConsumerClosed(MessageConsumer consumer, Exception cause) {
+            public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {
             }
 
             @Override
-            public void onProducerClosed(MessageProducer producer, Exception cause) {
+            public void onProducerClosed(MessageProducer producer, Throwable cause) {
             }
         });
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/88a77984/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/FileWatcherDiscoveryTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/FileWatcherDiscoveryTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/FileWatcherDiscoveryTest.java
index e0939fb..5b5015e 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/FileWatcherDiscoveryTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/FileWatcherDiscoveryTest.java
@@ -209,15 +209,15 @@ public class FileWatcherDiscoveryTest extends AmqpTestSupport {
             }
 
             @Override
-            public void onSessionClosed(Session session, Exception exception) {
+            public void onSessionClosed(Session session, Throwable exception) {
             }
 
             @Override
-            public void onConsumerClosed(MessageConsumer consumer, Exception cause) {
+            public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {
             }
 
             @Override
-            public void onProducerClosed(MessageProducer producer, Exception cause) {
+            public void onProducerClosed(MessageProducer producer, Throwable cause) {
             }
         });
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/88a77984/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java
index dee33f2..78cfa27 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java
@@ -224,14 +224,14 @@ public class JmsAmqpDiscoveryTest extends AmqpTestSupport implements JmsConnecti
     }
 
     @Override
-    public void onSessionClosed(Session session, Exception exception) {
+    public void onSessionClosed(Session session, Throwable exception) {
     }
 
     @Override
-    public void onConsumerClosed(MessageConsumer consumer, Exception cause) {
+    public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {
     }
 
     @Override
-    public void onProducerClosed(MessageProducer producer, Exception cause) {
+    public void onProducerClosed(MessageProducer producer, Throwable cause) {
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/88a77984/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
index 47c7bfc..fff403b 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
@@ -360,15 +360,15 @@ public class JmsTransactedConsumerTest extends AmqpTestSupport {
             }
 
             @Override
-            public void onSessionClosed(Session session, Exception exception) {
+            public void onSessionClosed(Session session, Throwable exception) {
             }
 
             @Override
-            public void onConsumerClosed(MessageConsumer consumer, Exception cause) {
+            public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {
             }
 
             @Override
-            public void onProducerClosed(MessageProducer producer, Exception cause) {
+            public void onProducerClosed(MessageProducer producer, Throwable cause) {
             }
         });
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org