You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/03/22 21:43:15 UTC

qpid-jms git commit: QPIDJMS-157 Adds support for sendTimeout and requestTimeout in the AMQP provider.

Repository: qpid-jms
Updated Branches:
  refs/heads/master 4e963e442 -> e0b5980c0


QPIDJMS-157 Adds support for sendTimeout and requestTimeout in the AMQP
provider.  

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/e0b5980c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/e0b5980c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/e0b5980c

Branch: refs/heads/master
Commit: e0b5980c077555b0d0a3c356d6d0580ec87c6497
Parents: 4e963e4
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Mar 22 16:42:59 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Mar 22 16:42:59 2016 -0400

----------------------------------------------------------------------
 .../jms/provider/amqp/AmqpFixedProducer.java    | 109 ++++++---
 .../qpid/jms/provider/amqp/AmqpProvider.java    |  44 +++-
 .../amqp/AmqpTransactionCoordinator.java        |  20 ++
 .../amqp/builders/AmqpConnectionBuilder.java    |   5 +
 .../amqp/builders/AmqpResourceBuilder.java      |  31 ++-
 .../org/apache/qpid/jms/JmsConnectionTest.java  |   2 +-
 .../FailedConnectionsIntegrationTest.java       |  29 ++-
 .../integration/ProducerIntegrationTest.java    | 152 +++++++++++-
 .../QueueBrowserIntegrationTest.java            |  20 +-
 .../jms/integration/SessionIntegrationTest.java |   7 +-
 .../TransactionsIntegrationTest.java            | 242 +++++++++++++++++++
 .../failover/FailoverIntegrationTest.java       |   6 +-
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    |  66 ++++-
 13 files changed, 644 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index 1282138..b582052 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -22,11 +22,14 @@ import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
 
 import javax.jms.JMSException;
 
+import org.apache.qpid.jms.JmsSendTimedOutException;
 import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
 import org.apache.qpid.jms.message.facade.JmsMessageFacade;
+import org.apache.qpid.jms.meta.JmsConnectionInfo;
 import org.apache.qpid.jms.meta.JmsProducerInfo;
 import org.apache.qpid.jms.provider.AsyncResult;
 import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageFacade;
@@ -57,8 +60,8 @@ public class AmqpFixedProducer extends AmqpProducer {
     private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
 
     private final AmqpTransferTagGenerator tagGenerator = new AmqpTransferTagGenerator(true);
-    private final Set<Delivery> pending = new LinkedHashSet<Delivery>();
-    private final LinkedList<PendingSend> pendingSends = new LinkedList<PendingSend>();
+    private final Set<Delivery> sent = new LinkedHashSet<Delivery>();
+    private final LinkedList<InFlightSend> blocked = new LinkedList<InFlightSend>();
     private byte[] encodeBuffer = new byte[1024 * 8];
     private boolean presettle = false;
 
@@ -73,7 +76,7 @@ public class AmqpFixedProducer extends AmqpProducer {
     @Override
     public void close(AsyncResult request) {
         // If any sends are held we need to wait for them to complete.
-        if (!pendingSends.isEmpty()) {
+        if (!blocked.isEmpty()) {
             this.closeRequest = request;
             return;
         }
@@ -83,15 +86,20 @@ public class AmqpFixedProducer extends AmqpProducer {
 
     @Override
     public boolean send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException {
-        // TODO - Handle the case where remote has no credit which means we can't send to it.
-        //        We need to hold the send until remote credit becomes available but we should
-        //        also have a send timeout option and filter timed out sends.
         if (getEndpoint().getCredit() <= 0) {
             LOG.trace("Holding Message send until credit is available.");
             // Once a message goes into a held mode we no longer can send it async, so
             // we clear the async flag if set to avoid the sender never getting notified.
             envelope.setSendAsync(false);
-            this.pendingSends.addLast(new PendingSend(envelope, request));
+
+            InFlightSend send = new InFlightSend(envelope, request);
+
+            if (getSendTimeout() > JmsConnectionInfo.INFINITE) {
+                send.requestTimeout = getParent().getProvider().scheduleRequestTimeout(
+                    send, getSendTimeout(), new JmsSendTimedOutException("Timed out waiting for credit to send Message", envelope.getMessage()));
+            }
+
+            blocked.addLast(send);
             return false;
         } else {
             doSend(envelope, request);
@@ -135,12 +143,20 @@ public class AmqpFixedProducer extends AmqpProducer {
         if (presettle) {
             delivery.settle();
         } else {
-            pending.add(delivery);
+            sent.add(delivery);
             getEndpoint().advance();
         }
 
         if (envelope.isSendAsync() || presettle) {
             request.onSuccess();
+        } else if (getSendTimeout() != JmsConnectionInfo.INFINITE) {
+            InFlightSend send = new InFlightSend(envelope, request);
+
+            send.requestTimeout = getParent().getProvider().scheduleRequestTimeout(
+                send, getSendTimeout(), new JmsSendTimedOutException("Timed out waiting for disposition of sent Message", envelope.getMessage()));
+
+            // Update context so the incoming disposition can cancel any pending timeout
+            delivery.setContext(send);
         }
     }
 
@@ -173,12 +189,12 @@ public class AmqpFixedProducer extends AmqpProducer {
 
     @Override
     public void processFlowUpdates(AmqpProvider provider) throws IOException {
-        if (!pendingSends.isEmpty() && getEndpoint().getCredit() > 0) {
-            while (getEndpoint().getCredit() > 0 && !pendingSends.isEmpty()) {
+        if (!blocked.isEmpty() && getEndpoint().getCredit() > 0) {
+            while (getEndpoint().getCredit() > 0 && !blocked.isEmpty()) {
                 LOG.trace("Dispatching previously held send");
-                PendingSend held = pendingSends.pop();
+                InFlightSend held = blocked.pop();
                 try {
-                    doSend(held.envelope, held.request);
+                    doSend(held.envelope, held);  // TODO - Cancel timeout and reset after dispatch ?
                 } catch (JMSException e) {
                     throw IOExceptionSupport.create(e);
                 }
@@ -186,7 +202,7 @@ public class AmqpFixedProducer extends AmqpProducer {
         }
 
         // Once the pending sends queue is drained we can propagate the close request.
-        if (pendingSends.isEmpty() && isAwaitingClose()) {
+        if (blocked.isEmpty() && isAwaitingClose()) {
             super.close(closeRequest);
         }
 
@@ -197,7 +213,7 @@ public class AmqpFixedProducer extends AmqpProducer {
     public void processDeliveryUpdates(AmqpProvider provider) throws IOException {
         List<Delivery> toRemove = new ArrayList<Delivery>();
 
-        for (Delivery delivery : pending) {
+        for (Delivery delivery : sent) {
             DeliveryState state = delivery.getRemoteState();
             if (state == null) {
                 continue;
@@ -251,7 +267,7 @@ public class AmqpFixedProducer extends AmqpProducer {
             delivery.settle();
         }
 
-        pending.removeAll(toRemove);
+        sent.removeAll(toRemove);
 
         super.processDeliveryUpdates(provider);
     }
@@ -275,22 +291,15 @@ public class AmqpFixedProducer extends AmqpProducer {
         return presettle;
     }
 
+    public long getSendTimeout() {
+        return getParent().getProvider().getSendTimeout();
+    }
+
     @Override
     public String toString() {
         return "AmqpFixedProducer { " + getProducerId() + " }";
     }
 
-    private static class PendingSend {
-
-        public JmsOutboundMessageDispatch envelope;
-        public AsyncResult request;
-
-        public PendingSend(JmsOutboundMessageDispatch envelope, AsyncResult request) {
-            this.envelope = envelope;
-            this.request = request;
-        }
-    }
-
     @Override
     public void remotelyClosed(AmqpProvider provider) {
         super.remotelyClosed(provider);
@@ -301,7 +310,7 @@ public class AmqpFixedProducer extends AmqpProducer {
             ex = new JMSException("Producer closed remotely before message transfer result was notified");
         }
 
-        for (Delivery delivery : pending) {
+        for (Delivery delivery : sent) {
             try {
                 AsyncResult request = (AsyncResult) delivery.getContext();
 
@@ -316,6 +325,50 @@ public class AmqpFixedProducer extends AmqpProducer {
             }
         }
 
-        pending.clear();
+        sent.clear();
+    }
+
+    //----- Class used to manage held sends ----------------------------------//
+
+    private class InFlightSend implements AsyncResult {
+
+        public final JmsOutboundMessageDispatch envelope;
+        public final AsyncResult request;
+
+        public ScheduledFuture<?> requestTimeout;
+
+        public InFlightSend(JmsOutboundMessageDispatch envelope, AsyncResult request) {
+            this.envelope = envelope;
+            this.request = request;
+        }
+
+        @Override
+        public void onFailure(Throwable cause) {
+            if (requestTimeout != null) {
+                requestTimeout.cancel(false);
+                requestTimeout = null;
+            }
+
+            blocked.remove(this);
+
+            request.onFailure(cause);
+        }
+
+        @Override
+        public void onSuccess() {
+            if (requestTimeout != null) {
+                requestTimeout.cancel(false);
+                requestTimeout = null;
+            }
+
+            blocked.remove(this);
+
+            request.onSuccess();
+        }
+
+        @Override
+        public boolean isComplete() {
+            return request.isComplete();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 4cb1028..5520ec4 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -768,39 +768,57 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
                 switch (protonEvent.getType()) {
                     case CONNECTION_REMOTE_CLOSE:
                         amqpEventSink = (AmqpEventSink) protonEvent.getConnection().getContext();
-                        amqpEventSink.processRemoteClose(this);
+                        if (amqpEventSink != null) {
+                            amqpEventSink.processRemoteClose(this);
+                        }
                         break;
                     case CONNECTION_REMOTE_OPEN:
                         amqpEventSink = (AmqpEventSink) protonEvent.getConnection().getContext();
-                        amqpEventSink.processRemoteOpen(this);
+                        if (amqpEventSink != null) {
+                            amqpEventSink.processRemoteOpen(this);
+                        }
                         break;
                     case SESSION_REMOTE_CLOSE:
                         amqpEventSink = (AmqpEventSink) protonEvent.getSession().getContext();
-                        amqpEventSink.processRemoteClose(this);
+                        if (amqpEventSink != null) {
+                            amqpEventSink.processRemoteClose(this);
+                        }
                         break;
                     case SESSION_REMOTE_OPEN:
                         amqpEventSink = (AmqpEventSink) protonEvent.getSession().getContext();
-                        amqpEventSink.processRemoteOpen(this);
+                        if (amqpEventSink != null) {
+                            amqpEventSink.processRemoteOpen(this);
+                        }
                         break;
                     case LINK_REMOTE_CLOSE:
                         amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
-                        amqpEventSink.processRemoteClose(this);
+                        if (amqpEventSink != null) {
+                            amqpEventSink.processRemoteClose(this);
+                        }
                         break;
                     case LINK_REMOTE_DETACH:
                         amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
-                        amqpEventSink.processRemoteDetach(this);
+                        if (amqpEventSink != null) {
+                            amqpEventSink.processRemoteDetach(this);
+                        }
                         break;
                     case LINK_REMOTE_OPEN:
                         amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
-                        amqpEventSink.processRemoteOpen(this);
+                        if (amqpEventSink != null) {
+                            amqpEventSink.processRemoteOpen(this);
+                        }
                         break;
                     case LINK_FLOW:
                         amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
-                        amqpEventSink.processFlowUpdates(this);
+                        if (amqpEventSink != null) {
+                            amqpEventSink.processFlowUpdates(this);
+                        }
                         break;
                     case DELIVERY:
                         amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
-                        amqpEventSink.processDeliveryUpdates(this);
+                        if (amqpEventSink != null) {
+                            amqpEventSink.processDeliveryUpdates(this);
+                        }
                         break;
                     default:
                         break;
@@ -1137,13 +1155,15 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
      *
      * @param request
      *      The request that should be marked as failed based on configuration.
+     * @param timeout
+     *      The time to wait before marking the request as failed.
      * @param error
      *      The error to use when failing the pending request.
      *
      * @return a {@link ScheduledFuture} that can be stored by the caller.
      */
-    public ScheduledFuture<?> scheduleRequestTimeout(final AsyncResult request, final Exception error) {
-        if (getRequestTimeout() != JmsConnectionInfo.INFINITE) {
+    public ScheduledFuture<?> scheduleRequestTimeout(final AsyncResult request, long timeout, final Exception error) {
+        if (timeout != JmsConnectionInfo.INFINITE) {
             return serializer.schedule(new Runnable() {
 
                 @Override
@@ -1152,7 +1172,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
                     pumpToProtonTransport();
                 }
 
-            }, getRequestTimeout(), TimeUnit.MILLISECONDS);
+            }, timeout, TimeUnit.MILLISECONDS);
         }
 
         return null;

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
index b06f686..f0ddf96 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
@@ -18,11 +18,14 @@ package org.apache.qpid.jms.provider.amqp;
 
 import java.io.IOException;
 import java.nio.BufferOverflowException;
+import java.util.concurrent.ScheduledFuture;
 
 import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
 import javax.jms.TransactionRolledBackException;
 
+import org.apache.qpid.jms.JmsOperationTimedOutException;
+import org.apache.qpid.jms.meta.JmsConnectionInfo;
 import org.apache.qpid.jms.meta.JmsSessionInfo;
 import org.apache.qpid.jms.meta.JmsTransactionId;
 import org.apache.qpid.jms.provider.AsyncResult;
@@ -57,6 +60,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<JmsSessionI
 
     private Delivery pendingDelivery;
     private AsyncResult pendingRequest;
+    private ScheduledFuture<?> pendingTimeout;
 
     public AmqpTransactionCoordinator(JmsSessionInfo resourceInfo, Sender endpoint, AmqpResourceParent parent) {
         super(resourceInfo, endpoint, parent);
@@ -94,6 +98,11 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<JmsSessionI
                 pendingDelivery.settle();
                 pendingRequest = null;
                 pendingDelivery = null;
+
+                if (pendingTimeout != null) {
+                    pendingTimeout.cancel(false);
+                    pendingTimeout = null;
+                }
             }
 
             super.processDeliveryUpdates(provider);
@@ -120,6 +129,8 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<JmsSessionI
         pendingDelivery.setContext(txId);
         pendingRequest = request;
 
+        scheduleTimeoutIfNeeded("Timed out waiting for declare of new TX.");
+
         sendTxCommand(message);
     }
 
@@ -154,6 +165,8 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<JmsSessionI
         pendingDelivery.setContext(txId);
         pendingRequest = request;
 
+        scheduleTimeoutIfNeeded("Timed out waiting for discharge of TX.");
+
         sendTxCommand(message);
     }
 
@@ -190,6 +203,13 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<JmsSessionI
 
     //----- Internal implementation ------------------------------------------//
 
+    private void scheduleTimeoutIfNeeded(String cause) {
+        AmqpProvider provider = getParent().getProvider();
+        if (provider.getRequestTimeout() != JmsConnectionInfo.INFINITE) {
+            provider.scheduleRequestTimeout(pendingRequest, provider.getRequestTimeout(), new JmsOperationTimedOutException(cause));
+        }
+    }
+
     private void sendTxCommand(Message message) throws IOException {
         int encodedSize = 0;
         byte[] buffer = OUTBOUND_BUFFER;

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
index 1f74682..71a4dd4 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
@@ -134,4 +134,9 @@ public class AmqpConnectionBuilder extends AmqpResourceBuilder<AmqpConnection, A
     protected boolean isClosePending() {
         return getResource().getProperties().isConnectionOpenFailed();
     }
+
+    @Override
+    protected long getRequestTimeout() {
+        return getParent().getProvider().getConnectTimeout();
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
index c2353e8..229e61f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
@@ -19,6 +19,8 @@ package org.apache.qpid.jms.provider.amqp.builders;
 import java.io.IOException;
 import java.util.concurrent.ScheduledFuture;
 
+import org.apache.qpid.jms.JmsOperationTimedOutException;
+import org.apache.qpid.jms.meta.JmsConnectionInfo;
 import org.apache.qpid.jms.meta.JmsResource;
 import org.apache.qpid.jms.provider.AsyncResult;
 import org.apache.qpid.jms.provider.amqp.AmqpEventSink;
@@ -74,7 +76,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
         // Create the resource object now
         resource = createResource(parent, resourceInfo, endpoint);
 
-        if (parent.getProvider().getRequestTimeout() > 0) {
+        if (getRequestTimeout() > JmsConnectionInfo.INFINITE) {
 
             // Attempt to schedule a cancellation of the pending open request, can return
             // null if there is no configured request timeout.
@@ -87,9 +89,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
 
                 @Override
                 public void onFailure(Throwable result) {
-                    // We ignore the default error and attempt to coerce a more
-                    // meaningful error from the endpoint.
-                    handleClosed(parent.getProvider());
+                    handleClosed(parent.getProvider(), result);
                 }
 
                 @Override
@@ -97,7 +97,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
                     return request.isComplete();
                 }
 
-            }, null);
+            }, getRequestTimeout(), new JmsOperationTimedOutException("Request to open resource " + getResource() + " timed out"));
         }
     }
 
@@ -110,7 +110,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
 
     @Override
     public void processRemoteClose(AmqpProvider provider) throws IOException {
-        handleClosed(provider);
+        handleClosed(provider, null);
     }
 
     @Override
@@ -158,13 +158,15 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
         }
     }
 
-    protected final void handleClosed(AmqpProvider provider) {
+    protected final void handleClosed(AmqpProvider provider, Throwable cause) {
         // If the resource being built is closed during the creation process
         // then this is always an error.
 
-        Exception openError;
+        Throwable openError;
         if (hasRemoteError()) {
             openError = AmqpSupport.convertToException(getEndpoint(), getEndpoint().getRemoteCondition());
+        } else if (cause != null) {
+            openError = cause;
         } else {
             openError = getOpenAbortException();
         }
@@ -244,11 +246,24 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
      * When aborting the open operation, and there isn't an error condition,
      * provided by the peer, the returned exception will be used instead.
      * A subclass may override this method to provide alternative behavior.
+     *
+     * @return an Exception to describes the open failure for this resource.
      */
     protected Exception getOpenAbortException() {
         return new IOException("Open failed unexpectedly.");
     }
 
+    /**
+     * Returns the configured time before the open of the resource is considered
+     * to have failed.  Subclasses can override this method to provide a value more
+     * appropriate to the resource being built.
+     *
+     * @return the configured timeout before the open of the resource fails.
+     */
+    protected long getRequestTimeout() {
+        return getParent().getProvider().getRequestTimeout();
+    }
+
     //----- Public access methods for the managed resources ------------------//
 
     public ENDPOINT getEndpoint() {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
index 6dabcca..bf5bbcf 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
@@ -68,7 +68,7 @@ public class JmsConnectionTest {
     @Test(timeout=30000, expected=JMSException.class)
     public void testJmsConnectionThrowsJMSExceptionProviderStartFails() throws JMSException, IllegalStateException, IOException {
         provider.getConfiguration().setFailOnStart(true);
-        new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        try (JmsConnection connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);) {}
     }
 
     @Test(timeout=30000)

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java
index b9c44af..556ae64 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java
@@ -34,6 +34,7 @@ import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
 
 import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.JmsOperationTimedOutException;
 import org.apache.qpid.jms.provider.ProviderRedirectedException;
 import org.apache.qpid.jms.provider.amqp.AmqpSupport;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
@@ -71,6 +72,24 @@ public class FailedConnectionsIntegrationTest extends QpidJmsTestCase {
     }
 
     @Test(timeout = 20000)
+    public void testConnectThrowsTimedOutExceptioWhenResponseNotSent() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            testPeer.expectSaslAnonymousConnect(true);
+            testPeer.expectClose();
+            try {
+                establishAnonymousConnecton(testPeer, true, "jms.connectTimeout=500");
+                fail("Should have thrown JmsOperationTimedOutException");
+            } catch (JmsOperationTimedOutException jmsEx) {
+                // Expected
+            } catch (Exception ex) {
+                fail("Should have thrown JMSException: " + ex);
+            }
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
     public void testConnectWithNotFoundErrorThrowsJMSEWhenInvalidContainerHintNotPresent() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
             testPeer.rejectConnect(AmqpError.NOT_FOUND, "Virtual Host does not exist", null);
@@ -158,8 +177,16 @@ public class FailedConnectionsIntegrationTest extends QpidJmsTestCase {
     }
 
     Connection establishAnonymousConnecton(TestAmqpPeer testPeer, boolean setClientId) throws JMSException {
+        return establishAnonymousConnecton(testPeer, setClientId, null);
+    }
+
+    Connection establishAnonymousConnecton(TestAmqpPeer testPeer, boolean setClientId, String connectionQuery) throws JMSException {
 
-        final String remoteURI = "amqp://localhost:" + testPeer.getServerPort();
+        String remoteURI = "amqp://localhost:" + testPeer.getServerPort();
+
+        if (connectionQuery != null && !connectionQuery.isEmpty()) {
+            remoteURI += "?" + connectionQuery;
+        }
 
         ConnectionFactory factory = new JmsConnectionFactory(remoteURI);
         Connection connection = factory.createConnection();

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index 2dd723b..de1e6b3 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -49,6 +49,7 @@ import javax.jms.TextMessage;
 
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.JmsSendTimedOutException;
 import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.Wait;
@@ -88,7 +89,10 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             MessageProducer producer = session.createProducer(queue);
 
             testPeer.expectDetach(true, true, true);
+            testPeer.expectClose();
+
             producer.close();
+            connection.close();
 
             testPeer.waitForAllHandlersToComplete(1000);
         }
@@ -116,6 +120,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             messageMatcher.setPropertiesMatcher(propsMatcher);
             messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
             testPeer.expectTransfer(messageMatcher);
+            testPeer.expectClose();
 
             TextMessage message = session.createTextMessage(text);
             producer.send(message);
@@ -124,6 +129,8 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             message.setText(text + text);
             assertEquals(text + text, message.getText());
 
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -147,12 +154,15 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             messageMatcher.setHeadersMatcher(headersMatcher);
             messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
             testPeer.expectTransfer(messageMatcher);
+            testPeer.expectClose();
 
             Message message = session.createTextMessage();
 
             producer.send(message);
             assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
 
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -178,6 +188,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             messageMatcher.setHeadersMatcher(headersMatcher);
             messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
             testPeer.expectTransfer(messageMatcher);
+            testPeer.expectClose();
 
             Message message = session.createTextMessage();
             message.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
@@ -187,6 +198,8 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
 
             assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
 
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -219,6 +232,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             messageMatcher.setPropertiesMatcher(propsMatcher);
             messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
             testPeer.expectTransfer(messageMatcher);
+            testPeer.expectClose();
 
             Message message = session.createTextMessage(text);
 
@@ -228,6 +242,8 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
 
             assertEquals("Should have had JMSDestination set", queue, message.getJMSDestination());
 
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -263,11 +279,14 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             messageMatcher.setPropertiesMatcher(propsMatcher);
             messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
             testPeer.expectTransfer(messageMatcher);
+            testPeer.expectClose();
 
             Message message = session.createTextMessage(text);
 
             producer.send(message);
 
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -301,14 +320,17 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             messageMatcher.setPropertiesMatcher(propsMatcher);
             messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
             testPeer.expectTransfer(messageMatcher);
+            testPeer.expectClose();
 
             Message message = session.createTextMessage(text);
 
             assertEquals("JMSTimestamp should not yet be set", 0, message.getJMSTimestamp());
 
             producer.setDisableMessageTimestamp(true);
-
             producer.send(message);
+
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
 
             assertEquals("JMSTimestamp should still not be set", 0, message.getJMSTimestamp());
@@ -350,11 +372,14 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             messageMatcher.setPropertiesMatcher(propsMatcher);
             messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
             testPeer.expectTransfer(messageMatcher);
+            testPeer.expectClose();
 
             Message message = session.createTextMessage(text);
 
             producer.send(message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, ttl);
 
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -406,12 +431,15 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             messageMatcher.setPropertiesMatcher(propsMatcher);
             messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
             testPeer.expectTransfer(messageMatcher);
+            testPeer.expectClose();
 
             Message message = session.createTextMessage(text);
             message.setLongProperty(AmqpMessageSupport.JMS_AMQP_TTL, amqpTtl);
 
             producer.send(message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, jmsTtl);
 
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(2000);
         }
     }
@@ -441,6 +469,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             messageMatcher.setHeadersMatcher(headersMatcher);
             messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
             testPeer.expectTransfer(messageMatcher);
+            testPeer.expectClose();
 
             Message message = session.createTextMessage();
 
@@ -450,6 +479,8 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
 
             assertEquals(Message.DEFAULT_PRIORITY, message.getJMSPriority());
 
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -481,6 +512,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             messageMatcher.setHeadersMatcher(headersMatcher);
             messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
             testPeer.expectTransfer(messageMatcher);
+            testPeer.expectClose();
 
             Message message = session.createTextMessage();
 
@@ -490,6 +522,8 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
 
             assertEquals(priority, message.getJMSPriority());
 
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -522,6 +556,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             messageMatcher.setPropertiesMatcher(propsMatcher);
             messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
             testPeer.expectTransfer(messageMatcher);
+            testPeer.expectClose();
 
             Message message = session.createTextMessage(text);
 
@@ -533,7 +568,9 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             assertNotNull("JMSMessageID should be set", jmsMessageID);
             assertTrue("JMS 'ID:' prefix not found", jmsMessageID.startsWith("ID:"));
 
-            //Get the value that was actually transmitted/received, verify it is a string, compare to what we have locally
+            connection.close();
+
+            // Get the value that was actually transmitted/received, verify it is a string, compare to what we have locally
             testPeer.waitForAllHandlersToComplete(1000);
             Object receivedMessageId = propsMatcher.getReceivedMessageId();
 
@@ -571,6 +608,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             messageMatcher.setPropertiesMatcher(propsMatcher);
             messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
             testPeer.expectTransfer(messageMatcher);
+            testPeer.expectClose();
 
             Message message = session.createTextMessage(text);
 
@@ -582,7 +620,9 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             assertNotNull("JMSMessageID should be set", jmsMessageID);
             assertTrue("JMS 'ID:' prefix not found", jmsMessageID.startsWith("ID:"));
 
-            //Get the value that was actually transmitted/received, verify it is a String, compare to what we have locally
+            connection.close();
+
+            // Get the value that was actually transmitted/received, verify it is a String, compare to what we have locally
             testPeer.waitForAllHandlersToComplete(1000);
 
             Object receivedMessageId = propsMatcher.getReceivedMessageId();
@@ -621,6 +661,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             messageMatcher.setPropertiesMatcher(propsMatcher);
             messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
             testPeer.expectTransfer(messageMatcher);
+            testPeer.expectClose();
 
             Message message = session.createTextMessage(text);
 
@@ -632,7 +673,9 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             assertNotNull("JMSMessageID should be set", jmsMessageID);
             assertTrue("JMS 'ID:' prefix not found", jmsMessageID.startsWith("ID:"));
 
-            //Get the value that was actually transmitted/received, verify it is a UUID, compare to what we have locally
+            connection.close();
+
+            // Get the value that was actually transmitted/received, verify it is a UUID, compare to what we have locally
             testPeer.waitForAllHandlersToComplete(1000);
 
             Object receivedMessageId = propsMatcher.getReceivedMessageId();
@@ -687,6 +730,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             messageMatcher.setPropertiesMatcher(propsMatcher);
             messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
             testPeer.expectTransfer(messageMatcher);
+            testPeer.expectClose();
 
             Message message = session.createTextMessage(text);
 
@@ -704,6 +748,8 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
 
             assertNull("JMSMessageID should be null", message.getJMSMessageID());
 
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(2000);
         }
     }
@@ -778,6 +824,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             // response, simply remotely close the producer instead.
             testPeer.expectTransfer(messageMatcher, nullValue(), false, false, null, false);
             testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true, AmqpError.RESOURCE_LIMIT_EXCEEDED, BREAD_CRUMB);
+            testPeer.expectClose();
 
             Queue queue = session.createQueue("myQueue");
             final MessageProducer producer = session.createProducer(queue);
@@ -793,6 +840,8 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
                 assertTrue("Expected breadcrumb to be present in message", jmse.getMessage().contains(BREAD_CRUMB));
             }
 
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(3000);
         }
     }
@@ -860,10 +909,89 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             testPeer.expectSenderAttach(100);
 
             testPeer.expectTransfer(messageMatcher);
+            testPeer.expectClose();
 
             MessageProducer producer = session.createProducer(queue);
 
             producer.send(message);
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testSendWhenLinkCreditIsZeroAndTimeout() throws Exception {
+        try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            connection.setSendTimeout(500);
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            String queueName = "myQueue";
+            Queue queue = session.createQueue(queueName);
+
+            Message message = session.createTextMessage("text");
+
+            // Expect the producer to attach. Don't send any credit so that the client will
+            // block on a send and we can test our timeouts.
+            testPeer.expectSenderAttachWithoutGrantingCredit();
+            testPeer.expectClose();
+
+            MessageProducer producer = session.createProducer(queue);
+
+            try {
+                producer.send(message);
+                fail("Send should time out.");
+            } catch (JmsSendTimedOutException jmsEx) {
+                LOG.info("Caught expected error: {}", jmsEx.getMessage());
+            } catch (Throwable error) {
+                fail("Send should time out, but got: " + error.getMessage());
+            }
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testSendTimesOutWhenNoDispostionArrives() throws Exception {
+        try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            connection.setSendTimeout(500);
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            String queueName = "myQueue";
+            Queue queue = session.createQueue(queueName);
+
+            Message message = session.createTextMessage("text");
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+
+            // Expect the producer to attach and grant it some credit, it should send
+            // a transfer which we will not send any response for which should cause the
+            // send operation to time out.
+            testPeer.expectSenderAttach(100);
+            testPeer.expectTransferButDoNotRespond(messageMatcher);
+            testPeer.expectClose();
+
+            MessageProducer producer = session.createProducer(queue);
+
+            try {
+                producer.send(message);
+                fail("Send should time out.");
+            } catch (JmsSendTimedOutException jmsEx) {
+                LOG.info("Caught expected error: {}", jmsEx.getMessage());
+            } catch (Throwable error) {
+                fail("Send should time out, but got: " + error.getMessage());
+            }
+
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -915,6 +1043,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             Message message = session.createTextMessage("content");
 
             testPeer.expectTransfer(new TransferPayloadCompositeMatcher(), nullValue(), false, responseState, true);
+            testPeer.expectClose();
 
             assertNull("Should not yet have a JMSDestination", message.getJMSDestination());
 
@@ -926,6 +1055,8 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
                 // Expected
             }
 
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(2000);
         }
     }
@@ -967,13 +1098,12 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
 
     private void doAsyncSendMessageNotAcceptedTestImpl(ListDescribedType responseState) throws JMSException, InterruptedException, Exception, IOException {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            Connection connection = testFixture.establishConnecton(testPeer);
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
 
             final CountDownLatch asyncError = new CountDownLatch(1);
 
-            JmsConnection jmsConnection = (JmsConnection) connection;
-            jmsConnection.setForceAsyncSend(true);
-            jmsConnection.setExceptionListener(new ExceptionListener() {
+            connection.setForceAsyncSend(true);
+            connection.setExceptionListener(new ExceptionListener() {
 
                 @Override
                 public void onException(JMSException exception) {
@@ -1011,6 +1141,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             assertTrue("Should get a non-fatal error", asyncError.await(10, TimeUnit.SECONDS));
 
             testPeer.expectTransfer(new TransferPayloadCompositeMatcher());
+            testPeer.expectClose();
 
             try {
                 producer.send(message);
@@ -1019,6 +1150,8 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
                 fail("No expected exception for this send.");
             }
 
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(2000);
         }
     }
@@ -1078,7 +1211,10 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             producer.send(session.createMessage());
 
             testPeer.expectDetach(true, true, true);
+            testPeer.expectClose();
+
             producer.close();
+            connection.close();
 
             testPeer.waitForAllHandlersToComplete(1000);
         }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
index b13fa00..b041f51 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
@@ -146,11 +146,10 @@ public class QueueBrowserIntegrationTest extends QpidJmsTestCase {
         final DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
 
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            Connection connection = testFixture.establishConnecton(testPeer);
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
             connection.start();
 
-            JmsConnection jmsConnection = (JmsConnection) connection;
-            jmsConnection.getPrefetchPolicy().setAll(1);
+            connection.getPrefetchPolicy().setAll(1);
 
             testPeer.expectBegin();
 
@@ -305,11 +304,10 @@ public class QueueBrowserIntegrationTest extends QpidJmsTestCase {
     @Test(timeout=30000)
     public void testCreateQueueBrowserAndEnumerationZeroPrefetch() throws IOException, Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            Connection connection = testFixture.establishConnecton(testPeer);
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
             connection.start();
 
-            JmsConnection jmsConnection = (JmsConnection) connection;
-            jmsConnection.getPrefetchPolicy().setAll(0);
+            connection.getPrefetchPolicy().setAll(0);
 
             testPeer.expectBegin();
 
@@ -333,11 +331,10 @@ public class QueueBrowserIntegrationTest extends QpidJmsTestCase {
     @Test(timeout=30000)
     public void testQueueBrowserHasMoreElementsZeroPrefetchNoMessage() throws IOException, Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            Connection connection = testFixture.establishConnecton(testPeer);
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
             connection.start();
 
-            JmsConnection jmsConnection = (JmsConnection) connection;
-            jmsConnection.getPrefetchPolicy().setAll(0);
+            connection.getPrefetchPolicy().setAll(0);
 
             testPeer.expectBegin();
 
@@ -365,11 +362,10 @@ public class QueueBrowserIntegrationTest extends QpidJmsTestCase {
         DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
 
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            Connection connection = testFixture.establishConnecton(testPeer);
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
             connection.start();
 
-            JmsConnection jmsConnection = (JmsConnection) connection;
-            jmsConnection.getPrefetchPolicy().setAll(0);
+            connection.getPrefetchPolicy().setAll(0);
 
             testPeer.expectBegin();
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 415ffb1..7275cac 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -56,6 +56,7 @@ import javax.jms.Topic;
 import javax.jms.TopicSubscriber;
 
 import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsOperationTimedOutException;
 import org.apache.qpid.jms.JmsPrefetchPolicy;
 import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
@@ -243,7 +244,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
                 // even though there is no detach response.
                 session.createConsumer(dest);
                 fail("Consumer creation should have failed when link was refused");
-            } catch(JMSException ex) {
+            } catch(JmsOperationTimedOutException ex) {
                 // Expected
                 LOG.info("Caught expected error on consumer create: {}", ex.getMessage());
             }
@@ -276,7 +277,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
                 QueueBrowser browser = session.createBrowser(dest);
                 browser.getEnumeration();
                 fail("Consumer creation should have failed when link was refused");
-            } catch(JMSException ex) {
+            } catch(JmsOperationTimedOutException ex) {
                 // Expected
                 LOG.info("Caught expected error on browser create: {}", ex.getMessage());
             }
@@ -995,7 +996,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
                 // Create a producer, expect it to throw exception due to the link-refusal
                 session.createProducer(dest);
                 fail("Producer creation should have failed when link was refused");
-            } catch(JMSException ex) {
+            } catch(JmsOperationTimedOutException ex) {
                 // Expected
                 LOG.info("Caught expected exception on create: {}", ex.getMessage());
             }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
index 2ef7046..dfd2f1c 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
@@ -39,6 +39,7 @@ import javax.jms.TextMessage;
 import javax.jms.TransactionRolledBackException;
 
 import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsOperationTimedOutException;
 import org.apache.qpid.jms.JmsPrefetchPolicy;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
@@ -146,6 +147,8 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             // reply with a declared disposition state containing the txnId.
             txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
             testPeer.expectDeclare(txnId);
+            testPeer.expectDischarge(txnId, true);
+            testPeer.expectClose();
 
             try {
                 session.commit();
@@ -153,6 +156,8 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             } catch (TransactionRolledBackException jmsTxRb) {
             }
 
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -225,9 +230,13 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             txState.setOutcome(new Accepted());
 
             testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true);
+            testPeer.expectDischarge(txnId, true);
+            testPeer.expectClose();
 
             producer.send(session.createMessage());
 
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -300,9 +309,13 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             txState.setOutcome(new Accepted());
 
             testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true);
+            testPeer.expectDischarge(txnId, true);
+            testPeer.expectClose();
 
             producer.send(session.createMessage());
 
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -410,6 +423,11 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
                 messageConsumer.close();
             }
 
+            testPeer.expectDischarge(txnId, true);
+            testPeer.expectClose();
+
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -450,9 +468,13 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             // reply with a declared disposition state containing the txnId.
             txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
             testPeer.expectDeclare(txnId);
+            testPeer.expectDischarge(txnId, true);
+            testPeer.expectClose();
 
             session.commit();
 
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -493,9 +515,13 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             // reply with a declared disposition state containing the txnId.
             txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
             testPeer.expectDeclare(txnId);
+            testPeer.expectDischarge(txnId, true);
+            testPeer.expectClose();
 
             session.rollback();
 
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -537,9 +563,13 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             txState.setOutcome(new Accepted());
 
             testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true);
+            testPeer.expectDischarge(txnId, true);
+            testPeer.expectClose();
 
             producer.send(session.createMessage());
 
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -634,8 +664,13 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
                 testPeer.expectLinkFlow(false, false, greaterThan(UnsignedInteger.ZERO));
             }
 
+            testPeer.expectDischarge(txnId, true);
+            testPeer.expectClose();
+
             session.rollback();
 
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -705,8 +740,13 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             // Expect the consumer to be 'started' again as rollback completes
             testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(messageCount)));
 
+            testPeer.expectDischarge(txnId, true);
+            testPeer.expectClose();
+
             session.rollback();
 
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -780,9 +820,13 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
 
             // Expect the consumer to be 'started' again as rollback completes
             testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(messageCount)));
+            testPeer.expectDischarge(txnId, true);
+            testPeer.expectClose();
 
             session.rollback();
 
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -814,9 +858,13 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
 
             testPeer.expectReceiverAttach(notNullValue(), sourceMatcher);
             testPeer.expectLinkFlow();
+            testPeer.expectDischarge(txnId, true);
+            testPeer.expectClose();
 
             session.createConsumer(queue);
 
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -835,6 +883,8 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             testPeer.remotelyCloseLastCoordinatorLinkOnDischarge(txnId, false);
             testPeer.expectCoordinatorAttach();
             testPeer.expectDeclare(txnId);
+            testPeer.expectDischarge(txnId, true);
+            testPeer.expectClose();
 
             Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
 
@@ -845,6 +895,8 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
                 LOG.info("Caught expected TransactionRolledBackException");
             }
 
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -863,6 +915,8 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             testPeer.remotelyCloseLastCoordinatorLinkOnDischarge(txnId, true);
             testPeer.expectCoordinatorAttach();
             testPeer.expectDeclare(txnId);
+            testPeer.expectDischarge(txnId, true);
+            testPeer.expectClose();
 
             Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
 
@@ -873,6 +927,8 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
                 LOG.info("Caught expected JMSException");
             }
 
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -911,12 +967,18 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             testPeer.expectCoordinatorAttach();
             testPeer.expectDeclare(txnId);
 
+            // Expect that the session TX will rollback on close.
+            testPeer.expectDischarge(txnId, true);
+            testPeer.expectClose();
+
             try {
                 session.commit();
                 fail("Commit operation should have failed.");
             } catch (TransactionRolledBackException jmsTxRb) {
             }
 
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -960,12 +1022,192 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             testPeer.expectCoordinatorAttach();
             testPeer.expectDeclare(txnId);
 
+            // Expect that the session TX will rollback on close.
+            testPeer.expectDischarge(txnId, true);
+            testPeer.expectClose();
+
             try {
                 session.commit();
                 fail("Commit operation should have failed.");
             } catch (TransactionRolledBackException jmsTxRb) {
             }
 
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout=20000)
+    public void testSessionCreateFailsOnDeclareTimeout() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            connection.setRequestTimeout(500);
+            connection.start();
+
+            testPeer.expectBegin();
+            testPeer.expectCoordinatorAttach();
+            testPeer.expectDeclareButDoNotRespond();
+            testPeer.expectClose();
+
+            try {
+                connection.createSession(true, Session.SESSION_TRANSACTED);
+                fail("Should have timed out waiting for declare.");
+            } catch (JmsOperationTimedOutException jmsEx) {
+            } catch (Throwable error) {
+                fail("Should have caught an timed out exception:");
+                LOG.error("Caught -> ", error);
+            }
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout=20000)
+    public void testTransactionRolledBackOnSessionCloseTimesOut() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            connection.setRequestTimeout(500);
+            connection.start();
+
+            testPeer.expectBegin();
+            testPeer.expectCoordinatorAttach();
+
+            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+            testPeer.expectDeclare(txnId);
+
+            // Closed session should roll-back the TX with a failed discharge
+            testPeer.expectDischargeButDoNotRespond(txnId, true);
+            testPeer.expectClose();
+
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+            try {
+                session.close();
+                fail("Should have timed out waiting for declare.");
+            } catch (JmsOperationTimedOutException jmsEx) {
+            } catch (Throwable error) {
+                fail("Should have caught an timed out exception:");
+                LOG.error("Caught -> ", error);
+            }
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout=20000)
+    public void testTransactionRolledBackTimesOut() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            connection.setRequestTimeout(500);
+            connection.start();
+
+            testPeer.expectBegin();
+            testPeer.expectCoordinatorAttach();
+
+            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+            testPeer.expectDeclare(txnId);
+
+            // Closed session should roll-back the TX with a failed discharge
+            testPeer.expectDischargeButDoNotRespond(txnId, true);
+
+            // Session should throw from the rollback and then try and recover.
+            testPeer.expectDeclare(txnId);
+            testPeer.expectDischarge(txnId, true);
+            testPeer.expectClose();
+
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+            try {
+                session.rollback();
+                fail("Should have timed out waiting for declare.");
+            } catch (JmsOperationTimedOutException jmsEx) {
+            } catch (Throwable error) {
+                fail("Should have caught an timed out exception:");
+                LOG.error("Caught -> ", error);
+            }
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout=20000)
+    public void testTransactionCommitTimesOut() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            connection.setRequestTimeout(500);
+            connection.start();
+
+            testPeer.expectBegin();
+            testPeer.expectCoordinatorAttach();
+
+            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+            testPeer.expectDeclare(txnId);
+
+            // Closed session should roll-back the TX with a failed discharge
+            testPeer.expectDischargeButDoNotRespond(txnId, false);
+
+            // Session should throw from the commit and then try and recover.
+            testPeer.expectDeclare(txnId);
+            testPeer.expectDischarge(txnId, true);
+            testPeer.expectClose();
+
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+            try {
+                session.commit();
+                fail("Should have timed out waiting for declare.");
+            } catch (JmsOperationTimedOutException jmsEx) {
+            } catch (Throwable error) {
+                fail("Should have caught an timed out exception:");
+                LOG.error("Caught -> ", error);
+            }
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout=20000)
+    public void testTransactionCommitTimesOutAndNoNextBeginTimesOut() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            connection.setRequestTimeout(500);
+            connection.start();
+
+            testPeer.expectBegin();
+            testPeer.expectCoordinatorAttach();
+
+            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+            testPeer.expectDeclare(txnId);
+
+            // Closed session should roll-back the TX with a failed discharge
+            testPeer.expectDischargeButDoNotRespond(txnId, false);
+
+            // Session should throw from the commit and then try and recover.
+            testPeer.expectDeclareButDoNotRespond();
+            testPeer.expectClose();
+
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+            try {
+                session.commit();
+                fail("Should have timed out waiting for declare.");
+            } catch (JmsOperationTimedOutException jmsEx) {
+            } catch (Throwable error) {
+                fail("Should have caught an timed out exception:");
+                LOG.error("Caught -> ", error);
+            }
+
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index 6b6dcb5..64af655 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -868,7 +868,7 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
             testPeer.dropAfterLastHandler();
 
             final JmsConnection connection = establishAnonymousConnecton(
-                "jms.requestTimeout=2000&failover.reconnectDelay=2000&failover.maxReconnectAttempts=60",
+                "jms.requestTimeout=1000&failover.reconnectDelay=2000&failover.maxReconnectAttempts=60",
                 testPeer);
             connection.addConnectionListener(new JmsDefaultConnectionListener() {
 
@@ -925,7 +925,7 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
             testPeer.dropAfterLastHandler();
 
             final JmsConnection connection = establishAnonymousConnecton(
-                "jms.sendTimeout=2000&failover.reconnectDelay=2000&failover.maxReconnectAttempts=60",
+                "jms.sendTimeout=1000&failover.reconnectDelay=2000&failover.maxReconnectAttempts=60",
                 testPeer);
             connection.addConnectionListener(new JmsDefaultConnectionListener() {
 
@@ -986,7 +986,7 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
             testPeer.dropAfterLastHandler();
 
             final JmsConnection connection = establishAnonymousConnecton(
-                "jms.requestTimeout=2000&failover.reconnectDelay=2000&failover.maxReconnectAttempts=60",
+                "jms.requestTimeout=1000&failover.reconnectDelay=2000&failover.maxReconnectAttempts=60",
                 testPeer);
             connection.addConnectionListener(new JmsDefaultConnectionListener() {
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index d4b304e..7ffdb2e 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -20,11 +20,11 @@ package org.apache.qpid.jms.test.testpeer;
 
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DYNAMIC_NODE_LIFETIME_POLICY;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.Matchers.arrayContaining;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasEntry;
 import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 
@@ -419,7 +419,8 @@ public class TestAmqpPeer implements AutoCloseable
     }
 
     public void expectSaslConnect(Symbol mechanism, Matcher<Binary> initialResponseMatcher, Symbol[] desiredCapabilities, Symbol[] serverCapabilities,
-                                    Matcher<?> clientPropertiesMatcher, Map<Symbol, Object> serverProperties, Matcher<?> idleTimeoutMatcher, Matcher<?> hostnameMatcher)
+                                  Matcher<?> clientPropertiesMatcher, Map<Symbol, Object> serverProperties, Matcher<?> idleTimeoutMatcher,
+                                  Matcher<?> hostnameMatcher, boolean deferOpened)
     {
         SaslMechanismsFrame saslMechanismsFrame = new SaslMechanismsFrame().setSaslServerMechanisms(mechanism);
         addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, AmqpHeader.SASL_HEADER,
@@ -467,12 +468,10 @@ public class TestAmqpPeer implements AutoCloseable
             open.setProperties(serverProperties);
         }
 
-        OpenMatcher openMatcher = new OpenMatcher()
-            .withContainerId(notNullValue(String.class))
-            .onCompletion(new FrameSender(
-                    this, FrameType.AMQP, 0,
-                    open,
-                    null));
+        OpenMatcher openMatcher = new OpenMatcher().withContainerId(notNullValue(String.class));
+        if (!deferOpened) {
+            openMatcher.onCompletion(new FrameSender(this, FrameType.AMQP, 0, open, null));
+        }
 
         if (desiredCapabilities != null)
         {
@@ -515,7 +514,7 @@ public class TestAmqpPeer implements AutoCloseable
 
         Matcher<Binary> initialResponseMatcher = equalTo(new Binary(data));
 
-        expectSaslConnect(PLAIN, initialResponseMatcher, desiredCapabilities, serverCapabilities, null, serverProperties, null, null);
+        expectSaslConnect(PLAIN, initialResponseMatcher, desiredCapabilities, serverCapabilities, null, serverProperties, null, null, false);
     }
 
     public void expectSaslExternalConnect()
@@ -525,7 +524,7 @@ public class TestAmqpPeer implements AutoCloseable
             throw new IllegalStateException("need-client-cert must be enabled on the test peer");
         }
 
-        expectSaslConnect(EXTERNAL, equalTo(new Binary(new byte[0])), new Symbol[] { AmqpSupport.SOLE_CONNECTION_CAPABILITY }, null, null, null, null, null);
+        expectSaslConnect(EXTERNAL, equalTo(new Binary(new byte[0])), new Symbol[] { AmqpSupport.SOLE_CONNECTION_CAPABILITY }, null, null, null, null, null, false);
     }
 
     public void expectSaslAnonymousConnect()
@@ -533,6 +532,11 @@ public class TestAmqpPeer implements AutoCloseable
         expectSaslAnonymousConnect(null, null);
     }
 
+    public void expectSaslAnonymousConnect(boolean deferOpened)
+    {
+        expectSaslConnect(ANONYMOUS, equalTo(new Binary(new byte[0])), new Symbol[] { AmqpSupport.SOLE_CONNECTION_CAPABILITY }, null, null, null, null, null, deferOpened);
+    }
+
     public void expectSaslAnonymousConnect(Matcher<?> idleTimeoutMatcher, Matcher<?> hostnameMatcher)
     {
         expectSaslAnonymousConnect(idleTimeoutMatcher, hostnameMatcher, null, null);
@@ -540,7 +544,7 @@ public class TestAmqpPeer implements AutoCloseable
 
     public void expectSaslAnonymousConnect(Matcher<?> idleTimeoutMatcher, Matcher<?> hostnameMatcher, Matcher<?> propertiesMatcher, Map<Symbol, Object> serverProperties)
     {
-        expectSaslConnect(ANONYMOUS, equalTo(new Binary(new byte[0])), new Symbol[] { AmqpSupport.SOLE_CONNECTION_CAPABILITY }, null, propertiesMatcher, serverProperties, idleTimeoutMatcher, hostnameMatcher);
+        expectSaslConnect(ANONYMOUS, equalTo(new Binary(new byte[0])), new Symbol[] { AmqpSupport.SOLE_CONNECTION_CAPABILITY }, null, propertiesMatcher, serverProperties, idleTimeoutMatcher, hostnameMatcher, false);
     }
 
     public void expectFailingSaslConnect(Symbol[] serverMechs, Symbol clientSelectedMech)
@@ -844,9 +848,14 @@ public class TestAmqpPeer implements AutoCloseable
         expectSenderAttach(notNullValue(), false, false);
     }
 
+    public void expectSenderAttachWithoutGrantingCredit()
+    {
+        expectSenderAttach(notNullValue(), notNullValue(), false, false, false, 0, 0, null, null);
+    }
+
     public void expectSenderAttach(long creditFlowDelay)
     {
-        expectSenderAttach(notNullValue(), notNullValue(), false, false, false, creditFlowDelay, null, null);
+        expectSenderAttach(notNullValue(), notNullValue(), false, false, false, creditFlowDelay, 100, null, null);
     }
 
     public void expectSenderAttach(final Matcher<?> targetMatcher, final boolean refuseLink, boolean deferAttachResponseWrite)
@@ -861,6 +870,11 @@ public class TestAmqpPeer implements AutoCloseable
 
     public void expectSenderAttach(final Matcher<?> sourceMatcher, final Matcher<?> targetMatcher, final boolean refuseLink, boolean omitDetach, boolean deferAttachResponseWrite, long creditFlowDelay, Symbol errorType, String errorMessage)
     {
+        expectSenderAttach(sourceMatcher, targetMatcher, refuseLink, omitDetach, deferAttachResponseWrite, creditFlowDelay, 100, errorType, errorMessage);
+    }
+
+    public void expectSenderAttach(final Matcher<?> sourceMatcher, final Matcher<?> targetMatcher, final boolean refuseLink, boolean omitDetach, boolean deferAttachResponseWrite, long creditFlowDelay, int creditAmount, Symbol errorType, String errorMessage)
+    {
         final AttachMatcher attachMatcher = new AttachMatcher()
                 .withName(notNullValue())
                 .withHandle(notNullValue())
@@ -942,7 +956,7 @@ public class TestAmqpPeer implements AutoCloseable
                 .setIncomingWindow(UnsignedInteger.valueOf(2048))
                 .setNextOutgoingId(UnsignedInteger.ONE) //TODO: shouldnt be hard coded
                 .setOutgoingWindow(UnsignedInteger.valueOf(2048))
-                .setLinkCredit(UnsignedInteger.valueOf(100));
+                .setLinkCredit(UnsignedInteger.valueOf(creditAmount));
 
             // The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
             final FrameSender flowFrameSender = new FrameSender(this, FrameType.AMQP, -1, flowFrame, null);
@@ -1476,6 +1490,11 @@ public class TestAmqpPeer implements AutoCloseable
         return payloadData.encode();
     }
 
+    public void expectTransferButDoNotRespond(Matcher<Binary> expectedPayloadMatcher)
+    {
+        expectTransfer(expectedPayloadMatcher, nullValue(), false, false, null, false);
+    }
+
     public void expectTransfer(Matcher<Binary> expectedPayloadMatcher)
     {
         expectTransfer(expectedPayloadMatcher, nullValue(), false, true, new Accepted(), true);
@@ -1538,6 +1557,14 @@ public class TestAmqpPeer implements AutoCloseable
         expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
     }
 
+    public void expectDeclareButDoNotRespond()
+    {
+        TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
+        declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
+
+        expectTransfer(declareMatcher, nullValue(), false, false, null, false);
+    }
+
     public void expectDischarge(Binary txnId, boolean dischargeState) {
         expectDischarge(txnId, dischargeState, new Accepted());
     }
@@ -1555,6 +1582,19 @@ public class TestAmqpPeer implements AutoCloseable
         expectTransfer(dischargeMatcher, nullValue(), false, responseState, true);
     }
 
+    public void expectDischargeButDoNotRespond(Binary txnId, boolean dischargeState) {
+        // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+        // and reply with given response and settled disposition to indicate the outcome.
+        Discharge discharge = new Discharge();
+        discharge.setFail(dischargeState);
+        discharge.setTxnId(txnId);
+
+        TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher();
+        dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
+
+        expectTransfer(dischargeMatcher, nullValue(), false, false, null, true);
+    }
+
     public void remotelyCloseLastCoordinatorLink()
     {
         remotelyCloseLastCoordinatorLink(true, true, TransactionError.TRANSACTION_ROLLBACK, "Discharge of TX failed.");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org