You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2015/12/03 00:13:08 UTC
qpid-jms git commit: https://issues.apache.org/jira/browse/QPIDJMS-137
Repository: qpid-jms
Updated Branches:
refs/heads/master baf15b096 -> 30e54e086
https://issues.apache.org/jira/browse/QPIDJMS-137
Allow a request timeout to be set that will fail an attempted resource
create when a response is not sent in time.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/30e54e08
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/30e54e08
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/30e54e08
Branch: refs/heads/master
Commit: 30e54e086d226696d930fb8cd00ccd9feae14702
Parents: baf15b0
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Dec 2 18:12:51 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Dec 2 18:12:51 2015 -0500
----------------------------------------------------------------------
.../qpid/jms/provider/amqp/AmqpProvider.java | 72 ++++++++----
.../jms/provider/amqp/AmqpResourceParent.java | 5 +
.../qpid/jms/provider/amqp/AmqpSession.java | 1 +
.../provider/amqp/AmqpTransactionContext.java | 7 +-
.../amqp/builders/AmqpConnectionBuilder.java | 19 ++--
.../amqp/builders/AmqpResourceBuilder.java | 49 +++++++-
.../AmqpTemporaryDestinationBuilder.java | 12 +-
.../integration/ConnectionIntegrationTest.java | 25 +++++
.../jms/integration/SessionIntegrationTest.java | 112 ++++++++++++++++++-
.../qpid/jms/test/testpeer/TestAmqpPeer.java | 109 +++++++++---------
10 files changed, 313 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/30e54e08/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 415db33..30889d3 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -26,6 +26,7 @@ import java.net.URI;
import java.nio.ByteBuffer;
import java.security.Principal;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
@@ -116,8 +117,6 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
private boolean presettleProducers;
private long connectTimeout = JmsConnectionInfo.DEFAULT_CONNECT_TIMEOUT;
private long closeTimeout = JmsConnectionInfo.DEFAULT_CLOSE_TIMEOUT;
- private long requestTimeout = JmsConnectionInfo.DEFAULT_REQUEST_TIMEOUT;
- private long sendTimeout = JmsConnectionInfo.DEFAULT_SEND_TIMEOUT;
private int channelMax = DEFAULT_CHANNEL_MAX;
private int idleTimeout = 60000;
private long sessionOutoingWindow = -1; //Use proton default
@@ -275,8 +274,6 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
public void processConnectionInfo(JmsConnectionInfo connectionInfo) throws Exception {
closeTimeout = connectionInfo.getCloseTimeout();
connectTimeout = connectionInfo.getConnectTimeout();
- sendTimeout = connectionInfo.getSendTimeout();
- requestTimeout = connectionInfo.getRequestTimeout();
if (getMaxFrameSize() > 0) {
protonTransport.setMaxFrameSize(getMaxFrameSize());
@@ -916,12 +913,6 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
}
}
- private void checkClosed() throws ProviderClosedException {
- if (closed.get()) {
- throw new ProviderClosedException("This Provider is already closed");
- }
- }
-
@Override
public void addChildResource(AmqpResource resource) {
if (resource instanceof AmqpConnection) {
@@ -1058,11 +1049,11 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
}
public long getRequestTimeout() {
- return requestTimeout;
+ return connection != null ? connection.getResourceInfo().getRequestTimeout() : JmsConnectionInfo.DEFAULT_REQUEST_TIMEOUT;
}
public long getSendTimeout() {
- return sendTimeout;
+ return connection != null ? connection.getResourceInfo().getSendTimeout() : JmsConnectionInfo.DEFAULT_SEND_TIMEOUT;
}
public void setPresettle(boolean presettle) {
@@ -1134,6 +1125,55 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
return this.serializer;
}
+ @Override
+ public AmqpProvider getProvider() {
+ return this;
+ }
+
+ /**
+ * Allows a resource to request that its parent resource schedule a future
+ * cancellation of a request and return it a {@link Future} instance that
+ * can be used to cancel the scheduled automatic failure of the request.
+ *
+ * @param request
+ * The request that should be marked as failed based on configuration.
+ * @param error
+ * The error to use when failing the pending request.
+ *
+ * @return a {@link ScheduledFuture} that can be stored by the caller.
+ */
+ public ScheduledFuture<?> scheduleRequestTimeout(final AsyncResult request, final Exception error) {
+ if (getRequestTimeout() != JmsConnectionInfo.INFINITE) {
+ return serializer.schedule(new Runnable() {
+
+ @Override
+ public void run() {
+ request.onFailure(error);
+ pumpToProtonTransport();
+ }
+
+ }, getRequestTimeout(), TimeUnit.MILLISECONDS);
+ }
+
+ return null;
+ }
+
+ Principal getLocalPrincipal() {
+ if (transport instanceof SSLTransport) {
+ return ((SSLTransport) transport).getLocalPrincipal();
+ }
+
+ return null;
+ }
+
+ //----- Internal implementation ------------------------------------------//
+
+ private void checkClosed() throws ProviderClosedException {
+ if (closed.get()) {
+ throw new ProviderClosedException("This Provider is already closed");
+ }
+ }
+
private final class IdleTimeoutCheck implements Runnable {
@Override
public void run() {
@@ -1170,14 +1210,6 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
}
}
- Principal getLocalPrincipal() {
- if (transport instanceof SSLTransport) {
- return ((SSLTransport) transport).getLocalPrincipal();
- }
-
- return null;
- }
-
private static void setHostname(Sasl sasl, String hostname) {
// TODO: this is a hack until Proton 0.10+ is available with sasl#setHostname method.
try {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/30e54e08/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResourceParent.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResourceParent.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResourceParent.java
index 6486719..5213a78 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResourceParent.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResourceParent.java
@@ -40,4 +40,9 @@ public interface AmqpResourceParent {
*/
void removeChildResource(AmqpResource resource);
+ /**
+ * @return a reference to the root AmqpProvider.
+ */
+ AmqpProvider getProvider();
+
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/30e54e08/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
index f9b461f..eb49c5f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
@@ -259,6 +259,7 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> i
getConnection().getProvider().fireProviderException(error);
}
+ @Override
public AmqpProvider getProvider() {
return connection.getProvider();
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/30e54e08/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
index 930fd32..b650892 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
@@ -253,7 +253,7 @@ public class AmqpTransactionContext implements AmqpResourceParent {
txConsumers.clear();
}
- //----- Resource Parent event handlers -----------------------------------//
+ //----- Resource Parent implementation -----------------------------------//
@Override
public void addChildResource(AmqpResource resource) {
@@ -268,4 +268,9 @@ public class AmqpTransactionContext implements AmqpResourceParent {
// to check if the current TX has failed due to link closed during
// normal operations.
}
+
+ @Override
+ public AmqpProvider getProvider() {
+ return session.getProvider();
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/30e54e08/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
index cad66a3..2de00f3 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
@@ -18,7 +18,6 @@ package org.apache.qpid.jms.provider.amqp.builders;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SOLE_CONNECTION_CAPABILITY;
-import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -96,16 +95,6 @@ public class AmqpConnectionBuilder extends AmqpResourceBuilder<AmqpConnection, A
}
@Override
- protected void handleOpened(AmqpProvider provider) throws IOException {
- // Initialize the connection properties so that the state of the remote can
- // be determined, this allows us to check for close pending.
- getResource().getProperties().initialize(
- getEndpoint().getRemoteOfferedCapabilities(), getEndpoint().getRemoteProperties());
-
- super.handleOpened(provider);
- }
-
- @Override
protected Connection createEndpoint(JmsConnectionInfo resourceInfo) {
String hostname = getParent().getVhost();
if (hostname == null) {
@@ -134,6 +123,14 @@ public class AmqpConnectionBuilder extends AmqpResourceBuilder<AmqpConnection, A
}
@Override
+ protected void afterOpened() {
+ // Initialize the connection properties so that the state of the remote can
+ // be determined, this allows us to check for close pending.
+ getResource().getProperties().initialize(
+ getEndpoint().getRemoteOfferedCapabilities(), getEndpoint().getRemoteProperties());
+ }
+
+ @Override
protected boolean isClosePending() {
return getResource().getProperties().isConnectionOpenFailed();
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/30e54e08/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
index a6618e7..d8590ef 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
@@ -17,6 +17,7 @@
package org.apache.qpid.jms.provider.amqp.builders;
import java.io.IOException;
+import java.util.concurrent.ScheduledFuture;
import org.apache.qpid.jms.meta.JmsResource;
import org.apache.qpid.jms.provider.AsyncResult;
@@ -42,6 +43,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
private static final Logger LOG = LoggerFactory.getLogger(AmqpResourceBuilder.class);
protected AsyncResult request;
+ protected ScheduledFuture<?> requestTimeoutTask;
protected TARGET resource;
protected ENDPOINT endpoint;
protected final PARENT parent;
@@ -61,7 +63,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
* @param request
* The request that initiated the resource creation.
*/
- public void buildResource(AsyncResult request) {
+ public void buildResource(final AsyncResult request) {
this.request = request;
// Create the local end of the manage resource.
@@ -71,6 +73,32 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
// Create the resource object now
resource = createResource(parent, resourceInfo, endpoint);
+
+ if (parent.getProvider().getRequestTimeout() > 0) {
+
+ // Attempt to schedule a cancellation of the pending open request, can return
+ // null if there is no configured request timeout.
+ requestTimeoutTask = parent.getProvider().scheduleRequestTimeout(new AsyncResult() {
+
+ @Override
+ public void onSuccess() {
+ // Nothing to do here.
+ }
+
+ @Override
+ public void onFailure(Throwable result) {
+ // We ignore the default error and attempt to coerce a more
+ // meaningful error from the endpoint.
+ handleClosed(parent.getProvider());
+ }
+
+ @Override
+ public boolean isComplete() {
+ return request.isComplete();
+ }
+
+ }, null);
+ }
}
//----- Event handlers ---------------------------------------------------//
@@ -102,15 +130,20 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
//----- Standard open and close handlers ---------------------------------//
- protected void handleOpened(AmqpProvider provider) throws IOException {
+ protected final void handleOpened(AmqpProvider provider) {
+
+ // perform any post open processing prior to opened state inspection.
+ afterOpened();
if (isClosePending()) {
return;
}
- if (isOpenedEndpointValid()) {
- afterOpened();
+ if (requestTimeoutTask != null) {
+ requestTimeoutTask.cancel(false);
+ }
+ if (isOpenedEndpointValid()) {
getEndpoint().setContext(resource);
getParent().addChildResource(resource);
getRequest().onSuccess();
@@ -125,7 +158,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
}
}
- protected void handleClosed(AmqpProvider provider) throws IOException {
+ protected final void handleClosed(AmqpProvider provider) {
// If the resource being built is closed during the creation process
// then this is always an error.
@@ -136,6 +169,10 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
openError = getOpenAbortException();
}
+ if (requestTimeoutTask != null) {
+ requestTimeoutTask.cancel(false);
+ }
+
LOG.warn("Open of resource:({}) failed: {}", resourceInfo, openError.getMessage());
// This resource is now terminated.
@@ -193,7 +230,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
/**
* Called once an endpoint has been opened and validated to give the subclasses a
* place to perform any follow-on processing or setup steps before the operation
- * is deemed to have been completed and success is signalled.
+ * is deemed to have been completed and success is signaled.
*/
protected void afterOpened() {
// Nothing to do here.
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/30e54e08/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTemporaryDestinationBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTemporaryDestinationBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTemporaryDestinationBuilder.java
index 01f173b..954d9b3 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTemporaryDestinationBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTemporaryDestinationBuilder.java
@@ -105,12 +105,14 @@ public class AmqpTemporaryDestinationBuilder extends AmqpResourceBuilder<AmqpTem
@Override
protected void afterOpened() {
- // Once our sender is opened we can read the updated name from the target address.
- String oldDestinationName = resourceInfo.getName();
- String destinationName = getEndpoint().getRemoteTarget().getAddress();
+ if (!isClosePending()) {
+ // Once our sender is opened we can read the updated name from the target address.
+ String oldDestinationName = resourceInfo.getName();
+ String destinationName = getEndpoint().getRemoteTarget().getAddress();
- resourceInfo.setName(destinationName);
+ resourceInfo.setName(destinationName);
- LOG.trace("Updated temp destination to: {} from: {}", destinationName, oldDestinationName);
+ LOG.trace("Updated temp destination to: {} from: {}", destinationName, oldDestinationName);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/30e54e08/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
index a84d260..d56bb62 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
@@ -124,6 +124,31 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase {
}
@Test(timeout = 20000)
+ public void testCreateTransactedSessionFailsWhenNoDetachResponseSent() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ ((JmsConnection) connection).setRequestTimeout(500);
+
+ testPeer.expectBegin();
+ // Expect the session, with an immediate link to the transaction coordinator
+ // using a target with the expected capabilities only.
+ CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
+ txCoordinatorMatcher.withCapabilities(arrayContaining(TxnCapability.LOCAL_TXN));
+ testPeer.expectSenderAttach(notNullValue(), txCoordinatorMatcher, true, true, false, 0, null, null);
+ testPeer.expectDetach(true, false, false);
+
+ try {
+ connection.createSession(true, Session.SESSION_TRANSACTED);
+ fail("Session create should have failed.");
+ } catch (JMSException ex) {
+ // Expected
+ }
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
public void testRemotelyCloseConnectionDuringSessionCreation() throws Exception {
final String BREAD_CRUMB = "ErrorMessageBreadCrumb";
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/30e54e08/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 4d36e8c..bf26f20 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -47,6 +47,7 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
+import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
@@ -214,6 +215,77 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
}
@Test(timeout = 20000)
+ public void testCreateConsumerFailsWhenLinkRefusalResponseNotSent() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+
+ ((JmsConnection) connection).setRequestTimeout(500);
+
+ connection.start();
+
+ testPeer.expectBegin();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ String topicName = "myTopic";
+ Topic dest = session.createTopic(topicName);
+
+ // Expect a link to a topic node, which we will then refuse
+ SourceMatcher targetMatcher = new SourceMatcher();
+ targetMatcher.withAddress(equalTo(topicName));
+ targetMatcher.withDynamic(equalTo(false));
+ targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
+
+ testPeer.expectReceiverAttach(notNullValue(), targetMatcher, false, true, true, false, null, null);
+ testPeer.expectDetach(true, false, false);
+
+ try {
+ // Create a consumer, expect it to throw exception due to the link-refusal
+ // even though there is no detach response.
+ session.createConsumer(dest);
+ fail("Consumer creation should have failed when link was refused");
+ } catch(JMSException ex) {
+ // Expected
+ LOG.info("Caught expected error on consumer create: {}", ex.getMessage());
+ }
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testCreateBrowserFailsWhenLinkRefusalResponseNotSent() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+
+ ((JmsConnection) connection).setRequestTimeout(500);
+
+ connection.start();
+
+ testPeer.expectBegin();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ String queueName = "myQueue";
+ Queue dest = session.createQueue(queueName);
+
+ testPeer.expectReceiverAttach(notNullValue(), notNullValue(), true, true, true, false, null, null);
+ testPeer.expectDetach(true, false, false);
+
+ try {
+ // Create a QueueBrowser, expect it to throw exception due to the link-refusal
+ // even though there is no detach response.
+ QueueBrowser browser = session.createBrowser(dest);
+ browser.getEnumeration();
+ fail("Consumer creation should have failed when link was refused");
+ } catch(JMSException ex) {
+ // Expected
+ LOG.info("Caught expected error on browser create: {}", ex.getMessage());
+ }
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
public void testCreateTemporaryQueueFailsWhenLinkRefusedAndAttachResponseWriteIsNotDeferred() throws Exception {
doCreateTemporaryDestinationFailsWhenLinkRefusedTestImpl(false, false);
}
@@ -421,7 +493,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
String topicName = "myTopic";
Topic destination = session.createTopic(topicName);
- testPeer.expectReceiverAttach(notNullValue(), notNullValue(), false, true, false, AmqpError.UNAUTHORIZED_ACCESS, "Destination is not readable");
+ testPeer.expectReceiverAttach(notNullValue(), notNullValue(), false, true, false, false, AmqpError.UNAUTHORIZED_ACCESS, "Destination is not readable");
testPeer.expectDetach(true, true, true);
try {
@@ -446,7 +518,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
String topicName = "myTopic";
Topic destination = session.createTopic(topicName);
- testPeer.expectSenderAttach(notNullValue(), notNullValue(), true, true, 0L, AmqpError.UNAUTHORIZED_ACCESS, "Destination is not readable");
+ testPeer.expectSenderAttach(notNullValue(), notNullValue(), true, false, true, 0L, AmqpError.UNAUTHORIZED_ACCESS, "Destination is not readable");
testPeer.expectDetach(true, true, true);
try {
@@ -897,6 +969,42 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
}
@Test(timeout = 20000)
+ public void testCreateProducerFailsWhenLinkRefusedNoDetachSent() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ ((JmsConnection) connection).setRequestTimeout(500);
+ connection.start();
+
+ testPeer.expectBegin();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ String topicName = "myTopic";
+ Topic dest = session.createTopic(topicName);
+
+ // Expect a link to a topic node, which we will then refuse
+ TargetMatcher targetMatcher = new TargetMatcher();
+ targetMatcher.withAddress(equalTo(topicName));
+ targetMatcher.withDynamic(equalTo(false));
+ targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
+
+ testPeer.expectSenderAttach(notNullValue(), targetMatcher, true, true, false, 0, null, null);
+ // Expect the detach response to the test peer closing the producer link after refusal.
+ testPeer.expectDetach(true, false, false);
+
+ try {
+ // Create a producer, expect it to throw exception due to the link-refusal
+ session.createProducer(dest);
+ fail("Producer creation should have failed when link was refused");
+ } catch(JMSException ex) {
+ // Expected
+ LOG.info("Caught expected exception on create: {}", ex.getMessage());
+ }
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
public void testCreateAnonymousProducerWhenAnonymousRelayNodeIsNotSupported() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/30e54e08/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 7fc557b..9396e3b 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -845,7 +845,7 @@ public class TestAmqpPeer implements AutoCloseable
public void expectSenderAttach(long creditFlowDelay)
{
- expectSenderAttach(notNullValue(), notNullValue(), false, false, creditFlowDelay, null, null);
+ expectSenderAttach(notNullValue(), notNullValue(), false, false, false, creditFlowDelay, null, null);
}
public void expectSenderAttach(final Matcher<?> targetMatcher, final boolean refuseLink, boolean deferAttachResponseWrite)
@@ -855,10 +855,10 @@ public class TestAmqpPeer implements AutoCloseable
public void expectSenderAttach(final Matcher<?> sourceMatcher, final Matcher<?> targetMatcher, final boolean refuseLink, boolean deferAttachResponseWrite)
{
- expectSenderAttach(notNullValue(), targetMatcher, refuseLink, deferAttachResponseWrite, 0, null, null);
+ expectSenderAttach(notNullValue(), targetMatcher, refuseLink, false, deferAttachResponseWrite, 0, null, null);
}
- public void expectSenderAttach(final Matcher<?> sourceMatcher, final Matcher<?> targetMatcher, final boolean refuseLink, boolean deferAttachResponseWrite, long creditFlowDelay, Symbol errorType, String errorMessage)
+ public void expectSenderAttach(final Matcher<?> sourceMatcher, final Matcher<?> targetMatcher, final boolean refuseLink, boolean omitDetach, boolean deferAttachResponseWrite, long creditFlowDelay, Symbol errorType, String errorMessage)
{
final AttachMatcher attachMatcher = new AttachMatcher()
.withName(notNullValue())
@@ -908,53 +908,56 @@ public class TestAmqpPeer implements AutoCloseable
attachResponseSender.setDeferWrite(true);
}
- final FlowFrame flowFrame = new FlowFrame().setNextIncomingId(UnsignedInteger.ONE) //TODO: shouldnt be hard coded
+ CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable();
+ composite.add(attachResponseSender);
+ if (refuseLink) {
+ if (!omitDetach) {
+ final DetachFrame detachResponse = new DetachFrame().setClosed(true);
+ if (errorType != null)
+ {
+ org.apache.qpid.jms.test.testpeer.describedtypes.Error detachError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error();
+
+ detachError.setCondition(errorType);
+ detachError.setDescription(errorMessage);
+
+ detachResponse.setError(detachError);
+ }
+
+ // The response frame channel will be dynamically set based on the
+ // incoming frame. Using the -1 is an illegal placeholder.
+ final FrameSender detachResonseSender = new FrameSender(this, FrameType.AMQP, -1, detachResponse, null);
+ detachResonseSender.setValueProvider(new ValueProvider() {
+ @Override
+ public void setValues() {
+ detachResonseSender.setChannel(attachMatcher.getActualChannel());
+ detachResponse.setHandle(attachMatcher.getReceivedHandle());
+ }
+ });
+
+ composite.add(detachResonseSender);
+ }
+ } else {
+ final FlowFrame flowFrame = new FlowFrame().setNextIncomingId(UnsignedInteger.ONE) //TODO: shouldnt be hard coded
.setIncomingWindow(UnsignedInteger.valueOf(2048))
.setNextOutgoingId(UnsignedInteger.ONE) //TODO: shouldnt be hard coded
.setOutgoingWindow(UnsignedInteger.valueOf(2048))
.setLinkCredit(UnsignedInteger.valueOf(100));
- // The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
- final FrameSender flowFrameSender = new FrameSender(this, FrameType.AMQP, -1, flowFrame, null);
- flowFrameSender.setValueProvider(new ValueProvider()
- {
- @Override
- public void setValues()
+ // The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
+ final FrameSender flowFrameSender = new FrameSender(this, FrameType.AMQP, -1, flowFrame, null);
+ flowFrameSender.setValueProvider(new ValueProvider()
{
- flowFrameSender.setChannel(attachMatcher.getActualChannel());
- flowFrame.setHandle(attachMatcher.getReceivedHandle());
- flowFrame.setDeliveryCount(attachMatcher.getReceivedInitialDeliveryCount());
- }
- });
- flowFrameSender.setSendDelay(creditFlowDelay);
-
- final DetachFrame detachResponse = new DetachFrame().setClosed(true);
- if (errorType != null)
- {
- org.apache.qpid.jms.test.testpeer.describedtypes.Error detachError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error();
-
- detachError.setCondition(errorType);
- detachError.setDescription(errorMessage);
-
- detachResponse.setError(detachError);
- }
+ @Override
+ public void setValues()
+ {
+ flowFrameSender.setChannel(attachMatcher.getActualChannel());
+ flowFrame.setHandle(attachMatcher.getReceivedHandle());
+ flowFrame.setDeliveryCount(attachMatcher.getReceivedInitialDeliveryCount());
+ }
+ });
- // The response frame channel will be dynamically set based on the
- // incoming frame. Using the -1 is an illegal placeholder.
- final FrameSender detachResonseSender = new FrameSender(this, FrameType.AMQP, -1, detachResponse, null);
- detachResonseSender.setValueProvider(new ValueProvider() {
- @Override
- public void setValues() {
- detachResonseSender.setChannel(attachMatcher.getActualChannel());
- detachResponse.setHandle(attachMatcher.getReceivedHandle());
- }
- });
+ flowFrameSender.setSendDelay(creditFlowDelay);
- CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable();
- composite.add(attachResponseSender);
- if (refuseLink) {
- composite.add(detachResonseSender);
- } else {
composite.add(flowFrameSender);
}
@@ -975,7 +978,7 @@ public class TestAmqpPeer implements AutoCloseable
public void expectCoordinatorAttach(final boolean refuseLink, boolean deferAttachResponseWrite, Symbol errorType, String errorMessage)
{
- expectSenderAttach(notNullValue(), new CoordinatorMatcher(), refuseLink, deferAttachResponseWrite, 0, errorType, errorMessage);
+ expectSenderAttach(notNullValue(), new CoordinatorMatcher(), refuseLink, false, deferAttachResponseWrite, 0, errorType, errorMessage);
}
public void expectQueueBrowserAttach()
@@ -990,25 +993,25 @@ public class TestAmqpPeer implements AutoCloseable
public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher)
{
- expectReceiverAttach(linkNameMatcher, sourceMatcher, false, false, false, null, null);
+ expectReceiverAttach(linkNameMatcher, sourceMatcher, false, false, false, false, null, null);
}
public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean settled)
{
- expectReceiverAttach(linkNameMatcher, sourceMatcher, settled, false, false, null, null);
+ expectReceiverAttach(linkNameMatcher, sourceMatcher, settled, false, false, false, null, null);
}
public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean refuseLink, boolean deferAttachResponseWrite)
{
- expectReceiverAttach(linkNameMatcher, sourceMatcher, false, refuseLink, deferAttachResponseWrite, null, null);
+ expectReceiverAttach(linkNameMatcher, sourceMatcher, false, refuseLink, false, deferAttachResponseWrite, null, null);
}
public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean settled, final boolean refuseLink, boolean deferAttachResponseWrite)
{
- expectReceiverAttach(linkNameMatcher, sourceMatcher, settled, refuseLink, deferAttachResponseWrite, null, null);
+ expectReceiverAttach(linkNameMatcher, sourceMatcher, settled, refuseLink, false, deferAttachResponseWrite, null, null);
}
- public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean settled, final boolean refuseLink, boolean deferAttachResponseWrite, Symbol errorType, String errorMessage)
+ public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean settled, final boolean refuseLink, boolean omitDetach, boolean deferAttachResponseWrite, Symbol errorType, String errorMessage)
{
final AttachMatcher attachMatcher = new AttachMatcher()
.withName(linkNameMatcher)
@@ -1057,7 +1060,7 @@ public class TestAmqpPeer implements AutoCloseable
CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable();
composite.add(attachResponseSender);
- if (refuseLink)
+ if (refuseLink && !omitDetach)
{
final DetachFrame detachResponse = new DetachFrame().setClosed(true);
if (errorType != null)
@@ -1072,18 +1075,18 @@ public class TestAmqpPeer implements AutoCloseable
// The response frame channel will be dynamically set based on the
// incoming frame. Using the -1 is an illegal placeholder.
- final FrameSender detachResonseSender = new FrameSender(this, FrameType.AMQP, -1, detachResponse, null);
- detachResonseSender.setValueProvider(new ValueProvider()
+ final FrameSender detachResponseSender = new FrameSender(this, FrameType.AMQP, -1, detachResponse, null);
+ detachResponseSender.setValueProvider(new ValueProvider()
{
@Override
public void setValues()
{
- detachResonseSender.setChannel(attachMatcher.getActualChannel());
+ detachResponseSender.setChannel(attachMatcher.getActualChannel());
detachResponse.setHandle(attachMatcher.getReceivedHandle());
}
});
- composite.add(detachResonseSender);
+ composite.add(detachResponseSender);
}
attachMatcher.onCompletion(composite);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org