You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2015/12/03 00:13:08 UTC

qpid-jms git commit: https://issues.apache.org/jira/browse/QPIDJMS-137

Repository: qpid-jms
Updated Branches:
  refs/heads/master baf15b096 -> 30e54e086


https://issues.apache.org/jira/browse/QPIDJMS-137

Allow a request timeout to be set that will fail an attempted resource
create when a response is not sent in time.  

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/30e54e08
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/30e54e08
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/30e54e08

Branch: refs/heads/master
Commit: 30e54e086d226696d930fb8cd00ccd9feae14702
Parents: baf15b0
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Dec 2 18:12:51 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Dec 2 18:12:51 2015 -0500

----------------------------------------------------------------------
 .../qpid/jms/provider/amqp/AmqpProvider.java    |  72 ++++++++----
 .../jms/provider/amqp/AmqpResourceParent.java   |   5 +
 .../qpid/jms/provider/amqp/AmqpSession.java     |   1 +
 .../provider/amqp/AmqpTransactionContext.java   |   7 +-
 .../amqp/builders/AmqpConnectionBuilder.java    |  19 ++--
 .../amqp/builders/AmqpResourceBuilder.java      |  49 +++++++-
 .../AmqpTemporaryDestinationBuilder.java        |  12 +-
 .../integration/ConnectionIntegrationTest.java  |  25 +++++
 .../jms/integration/SessionIntegrationTest.java | 112 ++++++++++++++++++-
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    | 109 +++++++++---------
 10 files changed, 313 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/30e54e08/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 415db33..30889d3 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -26,6 +26,7 @@ import java.net.URI;
 import java.nio.ByteBuffer;
 import java.security.Principal;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadFactory;
@@ -116,8 +117,6 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
     private boolean presettleProducers;
     private long connectTimeout = JmsConnectionInfo.DEFAULT_CONNECT_TIMEOUT;
     private long closeTimeout = JmsConnectionInfo.DEFAULT_CLOSE_TIMEOUT;
-    private long requestTimeout = JmsConnectionInfo.DEFAULT_REQUEST_TIMEOUT;
-    private long sendTimeout = JmsConnectionInfo.DEFAULT_SEND_TIMEOUT;
     private int channelMax = DEFAULT_CHANNEL_MAX;
     private int idleTimeout = 60000;
     private long sessionOutoingWindow = -1; //Use proton default
@@ -275,8 +274,6 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
                         public void processConnectionInfo(JmsConnectionInfo connectionInfo) throws Exception {
                             closeTimeout = connectionInfo.getCloseTimeout();
                             connectTimeout = connectionInfo.getConnectTimeout();
-                            sendTimeout = connectionInfo.getSendTimeout();
-                            requestTimeout = connectionInfo.getRequestTimeout();
 
                             if (getMaxFrameSize() > 0) {
                                 protonTransport.setMaxFrameSize(getMaxFrameSize());
@@ -916,12 +913,6 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
         }
     }
 
-    private void checkClosed() throws ProviderClosedException {
-        if (closed.get()) {
-            throw new ProviderClosedException("This Provider is already closed");
-        }
-    }
-
     @Override
     public void addChildResource(AmqpResource resource) {
         if (resource instanceof AmqpConnection) {
@@ -1058,11 +1049,11 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
     }
 
     public long getRequestTimeout() {
-        return requestTimeout;
+        return connection != null ? connection.getResourceInfo().getRequestTimeout() : JmsConnectionInfo.DEFAULT_REQUEST_TIMEOUT;
     }
 
     public long getSendTimeout() {
-        return sendTimeout;
+        return connection != null ? connection.getResourceInfo().getSendTimeout() : JmsConnectionInfo.DEFAULT_SEND_TIMEOUT;
     }
 
     public void setPresettle(boolean presettle) {
@@ -1134,6 +1125,55 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
         return this.serializer;
     }
 
+    @Override
+    public AmqpProvider getProvider() {
+        return this;
+    }
+
+    /**
+     * Allows a resource to request that its parent resource schedule a future
+     * cancellation of a request and return it a {@link Future} instance that
+     * can be used to cancel the scheduled automatic failure of the request.
+     *
+     * @param request
+     *      The request that should be marked as failed based on configuration.
+     * @param error
+     *      The error to use when failing the pending request.
+     *
+     * @return a {@link ScheduledFuture} that can be stored by the caller.
+     */
+    public ScheduledFuture<?> scheduleRequestTimeout(final AsyncResult request, final Exception error) {
+        if (getRequestTimeout() != JmsConnectionInfo.INFINITE) {
+            return serializer.schedule(new Runnable() {
+
+                @Override
+                public void run() {
+                    request.onFailure(error);
+                    pumpToProtonTransport();
+                }
+
+            }, getRequestTimeout(), TimeUnit.MILLISECONDS);
+        }
+
+        return null;
+    }
+
+    Principal getLocalPrincipal() {
+        if (transport instanceof SSLTransport) {
+            return ((SSLTransport) transport).getLocalPrincipal();
+        }
+
+        return null;
+    }
+
+    //----- Internal implementation ------------------------------------------//
+
+    private void checkClosed() throws ProviderClosedException {
+        if (closed.get()) {
+            throw new ProviderClosedException("This Provider is already closed");
+        }
+    }
+
     private final class IdleTimeoutCheck implements Runnable {
         @Override
         public void run() {
@@ -1170,14 +1210,6 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
         }
     }
 
-    Principal getLocalPrincipal() {
-        if (transport instanceof SSLTransport) {
-            return ((SSLTransport) transport).getLocalPrincipal();
-        }
-
-        return null;
-    }
-
     private static void setHostname(Sasl sasl, String hostname) {
         // TODO: this is a hack until Proton 0.10+ is available with sasl#setHostname method.
         try {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/30e54e08/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResourceParent.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResourceParent.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResourceParent.java
index 6486719..5213a78 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResourceParent.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResourceParent.java
@@ -40,4 +40,9 @@ public interface AmqpResourceParent {
      */
     void removeChildResource(AmqpResource resource);
 
+    /**
+     * @return a reference to the root AmqpProvider.
+     */
+    AmqpProvider getProvider();
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/30e54e08/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
index f9b461f..eb49c5f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
@@ -259,6 +259,7 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> i
         getConnection().getProvider().fireProviderException(error);
     }
 
+    @Override
     public AmqpProvider getProvider() {
         return connection.getProvider();
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/30e54e08/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
index 930fd32..b650892 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
@@ -253,7 +253,7 @@ public class AmqpTransactionContext implements AmqpResourceParent {
         txConsumers.clear();
     }
 
-    //----- Resource Parent event handlers -----------------------------------//
+    //----- Resource Parent implementation -----------------------------------//
 
     @Override
     public void addChildResource(AmqpResource resource) {
@@ -268,4 +268,9 @@ public class AmqpTransactionContext implements AmqpResourceParent {
         // to check if the current TX has failed due to link closed during
         // normal operations.
     }
+
+    @Override
+    public AmqpProvider getProvider() {
+        return session.getProvider();
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/30e54e08/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
index cad66a3..2de00f3 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
@@ -18,7 +18,6 @@ package org.apache.qpid.jms.provider.amqp.builders;
 
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SOLE_CONNECTION_CAPABILITY;
 
-import java.io.IOException;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
@@ -96,16 +95,6 @@ public class AmqpConnectionBuilder extends AmqpResourceBuilder<AmqpConnection, A
     }
 
     @Override
-    protected void handleOpened(AmqpProvider provider) throws IOException {
-        // Initialize the connection properties so that the state of the remote can
-        // be determined, this allows us to check for close pending.
-        getResource().getProperties().initialize(
-            getEndpoint().getRemoteOfferedCapabilities(), getEndpoint().getRemoteProperties());
-
-        super.handleOpened(provider);
-    }
-
-    @Override
     protected Connection createEndpoint(JmsConnectionInfo resourceInfo) {
         String hostname = getParent().getVhost();
         if (hostname == null) {
@@ -134,6 +123,14 @@ public class AmqpConnectionBuilder extends AmqpResourceBuilder<AmqpConnection, A
     }
 
     @Override
+    protected void afterOpened() {
+        // Initialize the connection properties so that the state of the remote can
+        // be determined, this allows us to check for close pending.
+        getResource().getProperties().initialize(
+            getEndpoint().getRemoteOfferedCapabilities(), getEndpoint().getRemoteProperties());
+    }
+
+    @Override
     protected boolean isClosePending() {
         return getResource().getProperties().isConnectionOpenFailed();
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/30e54e08/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
index a6618e7..d8590ef 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
@@ -17,6 +17,7 @@
 package org.apache.qpid.jms.provider.amqp.builders;
 
 import java.io.IOException;
+import java.util.concurrent.ScheduledFuture;
 
 import org.apache.qpid.jms.meta.JmsResource;
 import org.apache.qpid.jms.provider.AsyncResult;
@@ -42,6 +43,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
     private static final Logger LOG = LoggerFactory.getLogger(AmqpResourceBuilder.class);
 
     protected AsyncResult request;
+    protected ScheduledFuture<?> requestTimeoutTask;
     protected TARGET resource;
     protected ENDPOINT endpoint;
     protected final PARENT parent;
@@ -61,7 +63,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
      * @param request
      *      The request that initiated the resource creation.
      */
-    public void buildResource(AsyncResult request) {
+    public void buildResource(final AsyncResult request) {
         this.request = request;
 
         // Create the local end of the manage resource.
@@ -71,6 +73,32 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
 
         // Create the resource object now
         resource = createResource(parent, resourceInfo, endpoint);
+
+        if (parent.getProvider().getRequestTimeout() > 0) {
+
+            // Attempt to schedule a cancellation of the pending open request, can return
+            // null if there is no configured request timeout.
+            requestTimeoutTask = parent.getProvider().scheduleRequestTimeout(new AsyncResult() {
+
+                @Override
+                public void onSuccess() {
+                    // Nothing to do here.
+                }
+
+                @Override
+                public void onFailure(Throwable result) {
+                    // We ignore the default error and attempt to coerce a more
+                    // meaningful error from the endpoint.
+                    handleClosed(parent.getProvider());
+                }
+
+                @Override
+                public boolean isComplete() {
+                    return request.isComplete();
+                }
+
+            }, null);
+        }
     }
 
     //----- Event handlers ---------------------------------------------------//
@@ -102,15 +130,20 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
 
     //----- Standard open and close handlers ---------------------------------//
 
-    protected void handleOpened(AmqpProvider provider) throws IOException {
+    protected final void handleOpened(AmqpProvider provider) {
+
+        // perform any post open processing prior to opened state inspection.
+        afterOpened();
 
         if (isClosePending()) {
             return;
         }
 
-        if (isOpenedEndpointValid()) {
-            afterOpened();
+        if (requestTimeoutTask != null) {
+            requestTimeoutTask.cancel(false);
+        }
 
+        if (isOpenedEndpointValid()) {
             getEndpoint().setContext(resource);
             getParent().addChildResource(resource);
             getRequest().onSuccess();
@@ -125,7 +158,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
         }
     }
 
-    protected void handleClosed(AmqpProvider provider) throws IOException {
+    protected final void handleClosed(AmqpProvider provider) {
         // If the resource being built is closed during the creation process
         // then this is always an error.
 
@@ -136,6 +169,10 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
             openError = getOpenAbortException();
         }
 
+        if (requestTimeoutTask != null) {
+            requestTimeoutTask.cancel(false);
+        }
+
         LOG.warn("Open of resource:({}) failed: {}", resourceInfo, openError.getMessage());
 
         // This resource is now terminated.
@@ -193,7 +230,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
     /**
      * Called once an endpoint has been opened and validated to give the subclasses a
      * place to perform any follow-on processing or setup steps before the operation
-     * is deemed to have been completed and success is signalled.
+     * is deemed to have been completed and success is signaled.
      */
     protected void afterOpened() {
         // Nothing to do here.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/30e54e08/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTemporaryDestinationBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTemporaryDestinationBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTemporaryDestinationBuilder.java
index 01f173b..954d9b3 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTemporaryDestinationBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTemporaryDestinationBuilder.java
@@ -105,12 +105,14 @@ public class AmqpTemporaryDestinationBuilder extends AmqpResourceBuilder<AmqpTem
 
     @Override
     protected void afterOpened() {
-        // Once our sender is opened we can read the updated name from the target address.
-        String oldDestinationName = resourceInfo.getName();
-        String destinationName = getEndpoint().getRemoteTarget().getAddress();
+        if (!isClosePending()) {
+            // Once our sender is opened we can read the updated name from the target address.
+            String oldDestinationName = resourceInfo.getName();
+            String destinationName = getEndpoint().getRemoteTarget().getAddress();
 
-        resourceInfo.setName(destinationName);
+            resourceInfo.setName(destinationName);
 
-        LOG.trace("Updated temp destination to: {} from: {}", destinationName, oldDestinationName);
+            LOG.trace("Updated temp destination to: {} from: {}", destinationName, oldDestinationName);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/30e54e08/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
index a84d260..d56bb62 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
@@ -124,6 +124,31 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase {
     }
 
     @Test(timeout = 20000)
+    public void testCreateTransactedSessionFailsWhenNoDetachResponseSent() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            ((JmsConnection) connection).setRequestTimeout(500);
+
+            testPeer.expectBegin();
+            // Expect the session, with an immediate link to the transaction coordinator
+            // using a target with the expected capabilities only.
+            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
+            txCoordinatorMatcher.withCapabilities(arrayContaining(TxnCapability.LOCAL_TXN));
+            testPeer.expectSenderAttach(notNullValue(), txCoordinatorMatcher, true, true, false, 0, null, null);
+            testPeer.expectDetach(true, false, false);
+
+            try {
+                connection.createSession(true, Session.SESSION_TRANSACTED);
+                fail("Session create should have failed.");
+            } catch (JMSException ex) {
+                // Expected
+            }
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
     public void testRemotelyCloseConnectionDuringSessionCreation() throws Exception {
         final String BREAD_CRUMB = "ErrorMessageBreadCrumb";
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/30e54e08/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 4d36e8c..bf26f20 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -47,6 +47,7 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
+import javax.jms.QueueBrowser;
 import javax.jms.Session;
 import javax.jms.TemporaryQueue;
 import javax.jms.TemporaryTopic;
@@ -214,6 +215,77 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
     }
 
     @Test(timeout = 20000)
+    public void testCreateConsumerFailsWhenLinkRefusalResponseNotSent() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+
+            ((JmsConnection) connection).setRequestTimeout(500);
+
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+
+            // Expect a link to a topic node, which we will then refuse
+            SourceMatcher targetMatcher = new SourceMatcher();
+            targetMatcher.withAddress(equalTo(topicName));
+            targetMatcher.withDynamic(equalTo(false));
+            targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
+
+            testPeer.expectReceiverAttach(notNullValue(), targetMatcher, false, true, true, false, null, null);
+            testPeer.expectDetach(true, false, false);
+
+            try {
+                // Create a consumer, expect it to throw exception due to the link-refusal
+                // even though there is no detach response.
+                session.createConsumer(dest);
+                fail("Consumer creation should have failed when link was refused");
+            } catch(JMSException ex) {
+                // Expected
+                LOG.info("Caught expected error on consumer create: {}", ex.getMessage());
+            }
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testCreateBrowserFailsWhenLinkRefusalResponseNotSent() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+
+            ((JmsConnection) connection).setRequestTimeout(500);
+
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String queueName = "myQueue";
+            Queue dest = session.createQueue(queueName);
+
+            testPeer.expectReceiverAttach(notNullValue(), notNullValue(), true, true, true, false, null, null);
+            testPeer.expectDetach(true, false, false);
+
+            try {
+                // Create a QueueBrowser, expect it to throw exception due to the link-refusal
+                // even though there is no detach response.
+                QueueBrowser browser = session.createBrowser(dest);
+                browser.getEnumeration();
+                fail("Consumer creation should have failed when link was refused");
+            } catch(JMSException ex) {
+                // Expected
+                LOG.info("Caught expected error on browser create: {}", ex.getMessage());
+            }
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
     public void testCreateTemporaryQueueFailsWhenLinkRefusedAndAttachResponseWriteIsNotDeferred() throws Exception {
         doCreateTemporaryDestinationFailsWhenLinkRefusedTestImpl(false, false);
     }
@@ -421,7 +493,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             String topicName = "myTopic";
             Topic destination = session.createTopic(topicName);
 
-            testPeer.expectReceiverAttach(notNullValue(), notNullValue(), false, true, false, AmqpError.UNAUTHORIZED_ACCESS, "Destination is not readable");
+            testPeer.expectReceiverAttach(notNullValue(), notNullValue(), false, true, false, false, AmqpError.UNAUTHORIZED_ACCESS, "Destination is not readable");
             testPeer.expectDetach(true, true, true);
 
             try {
@@ -446,7 +518,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             String topicName = "myTopic";
             Topic destination = session.createTopic(topicName);
 
-            testPeer.expectSenderAttach(notNullValue(), notNullValue(), true, true, 0L, AmqpError.UNAUTHORIZED_ACCESS, "Destination is not readable");
+            testPeer.expectSenderAttach(notNullValue(), notNullValue(), true, false, true, 0L, AmqpError.UNAUTHORIZED_ACCESS, "Destination is not readable");
             testPeer.expectDetach(true, true, true);
 
             try {
@@ -897,6 +969,42 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
     }
 
     @Test(timeout = 20000)
+    public void testCreateProducerFailsWhenLinkRefusedNoDetachSent() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            ((JmsConnection) connection).setRequestTimeout(500);
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+
+            // Expect a link to a topic node, which we will then refuse
+            TargetMatcher targetMatcher = new TargetMatcher();
+            targetMatcher.withAddress(equalTo(topicName));
+            targetMatcher.withDynamic(equalTo(false));
+            targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
+
+            testPeer.expectSenderAttach(notNullValue(), targetMatcher, true, true, false, 0, null, null);
+            // Expect the detach response to the test peer closing the producer link after refusal.
+            testPeer.expectDetach(true, false, false);
+
+            try {
+                // Create a producer, expect it to throw exception due to the link-refusal
+                session.createProducer(dest);
+                fail("Producer creation should have failed when link was refused");
+            } catch(JMSException ex) {
+                // Expected
+                LOG.info("Caught expected exception on create: {}", ex.getMessage());
+            }
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
     public void testCreateAnonymousProducerWhenAnonymousRelayNodeIsNotSupported() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/30e54e08/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 7fc557b..9396e3b 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -845,7 +845,7 @@ public class TestAmqpPeer implements AutoCloseable
 
     public void expectSenderAttach(long creditFlowDelay)
     {
-        expectSenderAttach(notNullValue(), notNullValue(), false, false, creditFlowDelay, null, null);
+        expectSenderAttach(notNullValue(), notNullValue(), false, false, false, creditFlowDelay, null, null);
     }
 
     public void expectSenderAttach(final Matcher<?> targetMatcher, final boolean refuseLink, boolean deferAttachResponseWrite)
@@ -855,10 +855,10 @@ public class TestAmqpPeer implements AutoCloseable
 
     public void expectSenderAttach(final Matcher<?> sourceMatcher, final Matcher<?> targetMatcher, final boolean refuseLink, boolean deferAttachResponseWrite)
     {
-        expectSenderAttach(notNullValue(), targetMatcher, refuseLink, deferAttachResponseWrite, 0, null, null);
+        expectSenderAttach(notNullValue(), targetMatcher, refuseLink, false, deferAttachResponseWrite, 0, null, null);
     }
 
-    public void expectSenderAttach(final Matcher<?> sourceMatcher, final Matcher<?> targetMatcher, final boolean refuseLink, boolean deferAttachResponseWrite, long creditFlowDelay, Symbol errorType, String errorMessage)
+    public void expectSenderAttach(final Matcher<?> sourceMatcher, final Matcher<?> targetMatcher, final boolean refuseLink, boolean omitDetach, boolean deferAttachResponseWrite, long creditFlowDelay, Symbol errorType, String errorMessage)
     {
         final AttachMatcher attachMatcher = new AttachMatcher()
                 .withName(notNullValue())
@@ -908,53 +908,56 @@ public class TestAmqpPeer implements AutoCloseable
             attachResponseSender.setDeferWrite(true);
         }
 
-        final FlowFrame flowFrame = new FlowFrame().setNextIncomingId(UnsignedInteger.ONE) //TODO: shouldnt be hard coded
+        CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable();
+        composite.add(attachResponseSender);
+        if (refuseLink) {
+            if (!omitDetach) {
+                final DetachFrame detachResponse = new DetachFrame().setClosed(true);
+                if (errorType != null)
+                {
+                    org.apache.qpid.jms.test.testpeer.describedtypes.Error detachError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error();
+
+                    detachError.setCondition(errorType);
+                    detachError.setDescription(errorMessage);
+
+                    detachResponse.setError(detachError);
+                }
+
+                // The response frame channel will be dynamically set based on the
+                // incoming frame. Using the -1 is an illegal placeholder.
+                final FrameSender detachResonseSender = new FrameSender(this, FrameType.AMQP, -1, detachResponse, null);
+                detachResonseSender.setValueProvider(new ValueProvider() {
+                     @Override
+                     public void setValues() {
+                          detachResonseSender.setChannel(attachMatcher.getActualChannel());
+                          detachResponse.setHandle(attachMatcher.getReceivedHandle());
+                     }
+                });
+
+                composite.add(detachResonseSender);
+            }
+        } else {
+            final FlowFrame flowFrame = new FlowFrame().setNextIncomingId(UnsignedInteger.ONE) //TODO: shouldnt be hard coded
                 .setIncomingWindow(UnsignedInteger.valueOf(2048))
                 .setNextOutgoingId(UnsignedInteger.ONE) //TODO: shouldnt be hard coded
                 .setOutgoingWindow(UnsignedInteger.valueOf(2048))
                 .setLinkCredit(UnsignedInteger.valueOf(100));
 
-        // The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
-        final FrameSender flowFrameSender = new FrameSender(this, FrameType.AMQP, -1, flowFrame, null);
-        flowFrameSender.setValueProvider(new ValueProvider()
-        {
-            @Override
-            public void setValues()
+            // The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
+            final FrameSender flowFrameSender = new FrameSender(this, FrameType.AMQP, -1, flowFrame, null);
+            flowFrameSender.setValueProvider(new ValueProvider()
             {
-                flowFrameSender.setChannel(attachMatcher.getActualChannel());
-                flowFrame.setHandle(attachMatcher.getReceivedHandle());
-                flowFrame.setDeliveryCount(attachMatcher.getReceivedInitialDeliveryCount());
-            }
-        });
-        flowFrameSender.setSendDelay(creditFlowDelay);
-
-        final DetachFrame detachResponse = new DetachFrame().setClosed(true);
-        if (errorType != null)
-        {
-            org.apache.qpid.jms.test.testpeer.describedtypes.Error detachError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error();
-
-            detachError.setCondition(errorType);
-            detachError.setDescription(errorMessage);
-
-            detachResponse.setError(detachError);
-        }
+                @Override
+                public void setValues()
+                {
+                    flowFrameSender.setChannel(attachMatcher.getActualChannel());
+                    flowFrame.setHandle(attachMatcher.getReceivedHandle());
+                    flowFrame.setDeliveryCount(attachMatcher.getReceivedInitialDeliveryCount());
+                }
+            });
 
-        // The response frame channel will be dynamically set based on the
-        // incoming frame. Using the -1 is an illegal placeholder.
-        final FrameSender detachResonseSender = new FrameSender(this, FrameType.AMQP, -1, detachResponse, null);
-        detachResonseSender.setValueProvider(new ValueProvider() {
-             @Override
-             public void setValues() {
-                  detachResonseSender.setChannel(attachMatcher.getActualChannel());
-                  detachResponse.setHandle(attachMatcher.getReceivedHandle());
-             }
-        });
+            flowFrameSender.setSendDelay(creditFlowDelay);
 
-        CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable();
-        composite.add(attachResponseSender);
-        if (refuseLink) {
-            composite.add(detachResonseSender);
-        } else {
             composite.add(flowFrameSender);
         }
 
@@ -975,7 +978,7 @@ public class TestAmqpPeer implements AutoCloseable
 
     public void expectCoordinatorAttach(final boolean refuseLink, boolean deferAttachResponseWrite, Symbol errorType, String errorMessage)
     {
-        expectSenderAttach(notNullValue(), new CoordinatorMatcher(), refuseLink, deferAttachResponseWrite, 0, errorType, errorMessage);
+        expectSenderAttach(notNullValue(), new CoordinatorMatcher(), refuseLink, false, deferAttachResponseWrite, 0, errorType, errorMessage);
     }
 
     public void expectQueueBrowserAttach()
@@ -990,25 +993,25 @@ public class TestAmqpPeer implements AutoCloseable
 
     public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher)
     {
-        expectReceiverAttach(linkNameMatcher, sourceMatcher, false, false, false, null, null);
+        expectReceiverAttach(linkNameMatcher, sourceMatcher, false, false, false, false, null, null);
     }
 
     public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean settled)
     {
-        expectReceiverAttach(linkNameMatcher, sourceMatcher, settled, false, false, null, null);
+        expectReceiverAttach(linkNameMatcher, sourceMatcher, settled, false, false, false, null, null);
     }
 
     public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean refuseLink, boolean deferAttachResponseWrite)
     {
-        expectReceiverAttach(linkNameMatcher, sourceMatcher, false, refuseLink, deferAttachResponseWrite, null, null);
+        expectReceiverAttach(linkNameMatcher, sourceMatcher, false, refuseLink, false, deferAttachResponseWrite, null, null);
     }
 
     public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean settled, final boolean refuseLink, boolean deferAttachResponseWrite)
     {
-        expectReceiverAttach(linkNameMatcher, sourceMatcher, settled, refuseLink, deferAttachResponseWrite, null, null);
+        expectReceiverAttach(linkNameMatcher, sourceMatcher, settled, refuseLink, false, deferAttachResponseWrite, null, null);
     }
 
-    public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean settled, final boolean refuseLink, boolean deferAttachResponseWrite, Symbol errorType, String errorMessage)
+    public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean settled, final boolean refuseLink, boolean omitDetach, boolean deferAttachResponseWrite, Symbol errorType, String errorMessage)
     {
         final AttachMatcher attachMatcher = new AttachMatcher()
                 .withName(linkNameMatcher)
@@ -1057,7 +1060,7 @@ public class TestAmqpPeer implements AutoCloseable
         CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable();
         composite.add(attachResponseSender);
 
-        if (refuseLink)
+        if (refuseLink && !omitDetach)
         {
             final DetachFrame detachResponse = new DetachFrame().setClosed(true);
             if (errorType != null)
@@ -1072,18 +1075,18 @@ public class TestAmqpPeer implements AutoCloseable
 
             // The response frame channel will be dynamically set based on the
             // incoming frame. Using the -1 is an illegal placeholder.
-            final FrameSender detachResonseSender = new FrameSender(this, FrameType.AMQP, -1, detachResponse, null);
-            detachResonseSender.setValueProvider(new ValueProvider()
+            final FrameSender detachResponseSender = new FrameSender(this, FrameType.AMQP, -1, detachResponse, null);
+            detachResponseSender.setValueProvider(new ValueProvider()
             {
                 @Override
                 public void setValues()
                 {
-                    detachResonseSender.setChannel(attachMatcher.getActualChannel());
+                    detachResponseSender.setChannel(attachMatcher.getActualChannel());
                     detachResponse.setHandle(attachMatcher.getReceivedHandle());
                 }
             });
 
-            composite.add(detachResonseSender);
+            composite.add(detachResponseSender);
         }
 
         attachMatcher.onCompletion(composite);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org