You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/03/22 21:43:15 UTC
qpid-jms git commit: QPIDJMS-157 Adds support for sendTimeout and
requestTimeout in the AMQP provider.
Repository: qpid-jms
Updated Branches:
refs/heads/master 4e963e442 -> e0b5980c0
QPIDJMS-157 Adds support for sendTimeout and requestTimeout in the AMQP
provider.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/e0b5980c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/e0b5980c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/e0b5980c
Branch: refs/heads/master
Commit: e0b5980c077555b0d0a3c356d6d0580ec87c6497
Parents: 4e963e4
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Mar 22 16:42:59 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Mar 22 16:42:59 2016 -0400
----------------------------------------------------------------------
.../jms/provider/amqp/AmqpFixedProducer.java | 109 ++++++---
.../qpid/jms/provider/amqp/AmqpProvider.java | 44 +++-
.../amqp/AmqpTransactionCoordinator.java | 20 ++
.../amqp/builders/AmqpConnectionBuilder.java | 5 +
.../amqp/builders/AmqpResourceBuilder.java | 31 ++-
.../org/apache/qpid/jms/JmsConnectionTest.java | 2 +-
.../FailedConnectionsIntegrationTest.java | 29 ++-
.../integration/ProducerIntegrationTest.java | 152 +++++++++++-
.../QueueBrowserIntegrationTest.java | 20 +-
.../jms/integration/SessionIntegrationTest.java | 7 +-
.../TransactionsIntegrationTest.java | 242 +++++++++++++++++++
.../failover/FailoverIntegrationTest.java | 6 +-
.../qpid/jms/test/testpeer/TestAmqpPeer.java | 66 ++++-
13 files changed, 644 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index 1282138..b582052 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -22,11 +22,14 @@ import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
import javax.jms.JMSException;
+import org.apache.qpid.jms.JmsSendTimedOutException;
import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
import org.apache.qpid.jms.message.facade.JmsMessageFacade;
+import org.apache.qpid.jms.meta.JmsConnectionInfo;
import org.apache.qpid.jms.meta.JmsProducerInfo;
import org.apache.qpid.jms.provider.AsyncResult;
import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageFacade;
@@ -57,8 +60,8 @@ public class AmqpFixedProducer extends AmqpProducer {
private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
private final AmqpTransferTagGenerator tagGenerator = new AmqpTransferTagGenerator(true);
- private final Set<Delivery> pending = new LinkedHashSet<Delivery>();
- private final LinkedList<PendingSend> pendingSends = new LinkedList<PendingSend>();
+ private final Set<Delivery> sent = new LinkedHashSet<Delivery>();
+ private final LinkedList<InFlightSend> blocked = new LinkedList<InFlightSend>();
private byte[] encodeBuffer = new byte[1024 * 8];
private boolean presettle = false;
@@ -73,7 +76,7 @@ public class AmqpFixedProducer extends AmqpProducer {
@Override
public void close(AsyncResult request) {
// If any sends are held we need to wait for them to complete.
- if (!pendingSends.isEmpty()) {
+ if (!blocked.isEmpty()) {
this.closeRequest = request;
return;
}
@@ -83,15 +86,20 @@ public class AmqpFixedProducer extends AmqpProducer {
@Override
public boolean send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException {
- // TODO - Handle the case where remote has no credit which means we can't send to it.
- // We need to hold the send until remote credit becomes available but we should
- // also have a send timeout option and filter timed out sends.
if (getEndpoint().getCredit() <= 0) {
LOG.trace("Holding Message send until credit is available.");
// Once a message goes into a held mode we no longer can send it async, so
// we clear the async flag if set to avoid the sender never getting notified.
envelope.setSendAsync(false);
- this.pendingSends.addLast(new PendingSend(envelope, request));
+
+ InFlightSend send = new InFlightSend(envelope, request);
+
+ if (getSendTimeout() > JmsConnectionInfo.INFINITE) {
+ send.requestTimeout = getParent().getProvider().scheduleRequestTimeout(
+ send, getSendTimeout(), new JmsSendTimedOutException("Timed out waiting for credit to send Message", envelope.getMessage()));
+ }
+
+ blocked.addLast(send);
return false;
} else {
doSend(envelope, request);
@@ -135,12 +143,20 @@ public class AmqpFixedProducer extends AmqpProducer {
if (presettle) {
delivery.settle();
} else {
- pending.add(delivery);
+ sent.add(delivery);
getEndpoint().advance();
}
if (envelope.isSendAsync() || presettle) {
request.onSuccess();
+ } else if (getSendTimeout() != JmsConnectionInfo.INFINITE) {
+ InFlightSend send = new InFlightSend(envelope, request);
+
+ send.requestTimeout = getParent().getProvider().scheduleRequestTimeout(
+ send, getSendTimeout(), new JmsSendTimedOutException("Timed out waiting for disposition of sent Message", envelope.getMessage()));
+
+ // Update context so the incoming disposition can cancel any pending timeout
+ delivery.setContext(send);
}
}
@@ -173,12 +189,12 @@ public class AmqpFixedProducer extends AmqpProducer {
@Override
public void processFlowUpdates(AmqpProvider provider) throws IOException {
- if (!pendingSends.isEmpty() && getEndpoint().getCredit() > 0) {
- while (getEndpoint().getCredit() > 0 && !pendingSends.isEmpty()) {
+ if (!blocked.isEmpty() && getEndpoint().getCredit() > 0) {
+ while (getEndpoint().getCredit() > 0 && !blocked.isEmpty()) {
LOG.trace("Dispatching previously held send");
- PendingSend held = pendingSends.pop();
+ InFlightSend held = blocked.pop();
try {
- doSend(held.envelope, held.request);
+ doSend(held.envelope, held); // TODO - Cancel timeout and reset after dispatch ?
} catch (JMSException e) {
throw IOExceptionSupport.create(e);
}
@@ -186,7 +202,7 @@ public class AmqpFixedProducer extends AmqpProducer {
}
// Once the pending sends queue is drained we can propagate the close request.
- if (pendingSends.isEmpty() && isAwaitingClose()) {
+ if (blocked.isEmpty() && isAwaitingClose()) {
super.close(closeRequest);
}
@@ -197,7 +213,7 @@ public class AmqpFixedProducer extends AmqpProducer {
public void processDeliveryUpdates(AmqpProvider provider) throws IOException {
List<Delivery> toRemove = new ArrayList<Delivery>();
- for (Delivery delivery : pending) {
+ for (Delivery delivery : sent) {
DeliveryState state = delivery.getRemoteState();
if (state == null) {
continue;
@@ -251,7 +267,7 @@ public class AmqpFixedProducer extends AmqpProducer {
delivery.settle();
}
- pending.removeAll(toRemove);
+ sent.removeAll(toRemove);
super.processDeliveryUpdates(provider);
}
@@ -275,22 +291,15 @@ public class AmqpFixedProducer extends AmqpProducer {
return presettle;
}
+ public long getSendTimeout() {
+ return getParent().getProvider().getSendTimeout();
+ }
+
@Override
public String toString() {
return "AmqpFixedProducer { " + getProducerId() + " }";
}
- private static class PendingSend {
-
- public JmsOutboundMessageDispatch envelope;
- public AsyncResult request;
-
- public PendingSend(JmsOutboundMessageDispatch envelope, AsyncResult request) {
- this.envelope = envelope;
- this.request = request;
- }
- }
-
@Override
public void remotelyClosed(AmqpProvider provider) {
super.remotelyClosed(provider);
@@ -301,7 +310,7 @@ public class AmqpFixedProducer extends AmqpProducer {
ex = new JMSException("Producer closed remotely before message transfer result was notified");
}
- for (Delivery delivery : pending) {
+ for (Delivery delivery : sent) {
try {
AsyncResult request = (AsyncResult) delivery.getContext();
@@ -316,6 +325,50 @@ public class AmqpFixedProducer extends AmqpProducer {
}
}
- pending.clear();
+ sent.clear();
+ }
+
+ //----- Class used to manage held sends ----------------------------------//
+
+ private class InFlightSend implements AsyncResult {
+
+ public final JmsOutboundMessageDispatch envelope;
+ public final AsyncResult request;
+
+ public ScheduledFuture<?> requestTimeout;
+
+ public InFlightSend(JmsOutboundMessageDispatch envelope, AsyncResult request) {
+ this.envelope = envelope;
+ this.request = request;
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ if (requestTimeout != null) {
+ requestTimeout.cancel(false);
+ requestTimeout = null;
+ }
+
+ blocked.remove(this);
+
+ request.onFailure(cause);
+ }
+
+ @Override
+ public void onSuccess() {
+ if (requestTimeout != null) {
+ requestTimeout.cancel(false);
+ requestTimeout = null;
+ }
+
+ blocked.remove(this);
+
+ request.onSuccess();
+ }
+
+ @Override
+ public boolean isComplete() {
+ return request.isComplete();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 4cb1028..5520ec4 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -768,39 +768,57 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
switch (protonEvent.getType()) {
case CONNECTION_REMOTE_CLOSE:
amqpEventSink = (AmqpEventSink) protonEvent.getConnection().getContext();
- amqpEventSink.processRemoteClose(this);
+ if (amqpEventSink != null) {
+ amqpEventSink.processRemoteClose(this);
+ }
break;
case CONNECTION_REMOTE_OPEN:
amqpEventSink = (AmqpEventSink) protonEvent.getConnection().getContext();
- amqpEventSink.processRemoteOpen(this);
+ if (amqpEventSink != null) {
+ amqpEventSink.processRemoteOpen(this);
+ }
break;
case SESSION_REMOTE_CLOSE:
amqpEventSink = (AmqpEventSink) protonEvent.getSession().getContext();
- amqpEventSink.processRemoteClose(this);
+ if (amqpEventSink != null) {
+ amqpEventSink.processRemoteClose(this);
+ }
break;
case SESSION_REMOTE_OPEN:
amqpEventSink = (AmqpEventSink) protonEvent.getSession().getContext();
- amqpEventSink.processRemoteOpen(this);
+ if (amqpEventSink != null) {
+ amqpEventSink.processRemoteOpen(this);
+ }
break;
case LINK_REMOTE_CLOSE:
amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
- amqpEventSink.processRemoteClose(this);
+ if (amqpEventSink != null) {
+ amqpEventSink.processRemoteClose(this);
+ }
break;
case LINK_REMOTE_DETACH:
amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
- amqpEventSink.processRemoteDetach(this);
+ if (amqpEventSink != null) {
+ amqpEventSink.processRemoteDetach(this);
+ }
break;
case LINK_REMOTE_OPEN:
amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
- amqpEventSink.processRemoteOpen(this);
+ if (amqpEventSink != null) {
+ amqpEventSink.processRemoteOpen(this);
+ }
break;
case LINK_FLOW:
amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
- amqpEventSink.processFlowUpdates(this);
+ if (amqpEventSink != null) {
+ amqpEventSink.processFlowUpdates(this);
+ }
break;
case DELIVERY:
amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
- amqpEventSink.processDeliveryUpdates(this);
+ if (amqpEventSink != null) {
+ amqpEventSink.processDeliveryUpdates(this);
+ }
break;
default:
break;
@@ -1137,13 +1155,15 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
*
* @param request
* The request that should be marked as failed based on configuration.
+ * @param timeout
+ * The time to wait before marking the request as failed.
* @param error
* The error to use when failing the pending request.
*
* @return a {@link ScheduledFuture} that can be stored by the caller.
*/
- public ScheduledFuture<?> scheduleRequestTimeout(final AsyncResult request, final Exception error) {
- if (getRequestTimeout() != JmsConnectionInfo.INFINITE) {
+ public ScheduledFuture<?> scheduleRequestTimeout(final AsyncResult request, long timeout, final Exception error) {
+ if (timeout != JmsConnectionInfo.INFINITE) {
return serializer.schedule(new Runnable() {
@Override
@@ -1152,7 +1172,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
pumpToProtonTransport();
}
- }, getRequestTimeout(), TimeUnit.MILLISECONDS);
+ }, timeout, TimeUnit.MILLISECONDS);
}
return null;
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
index b06f686..f0ddf96 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
@@ -18,11 +18,14 @@ package org.apache.qpid.jms.provider.amqp;
import java.io.IOException;
import java.nio.BufferOverflowException;
+import java.util.concurrent.ScheduledFuture;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.TransactionRolledBackException;
+import org.apache.qpid.jms.JmsOperationTimedOutException;
+import org.apache.qpid.jms.meta.JmsConnectionInfo;
import org.apache.qpid.jms.meta.JmsSessionInfo;
import org.apache.qpid.jms.meta.JmsTransactionId;
import org.apache.qpid.jms.provider.AsyncResult;
@@ -57,6 +60,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<JmsSessionI
private Delivery pendingDelivery;
private AsyncResult pendingRequest;
+ private ScheduledFuture<?> pendingTimeout;
public AmqpTransactionCoordinator(JmsSessionInfo resourceInfo, Sender endpoint, AmqpResourceParent parent) {
super(resourceInfo, endpoint, parent);
@@ -94,6 +98,11 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<JmsSessionI
pendingDelivery.settle();
pendingRequest = null;
pendingDelivery = null;
+
+ if (pendingTimeout != null) {
+ pendingTimeout.cancel(false);
+ pendingTimeout = null;
+ }
}
super.processDeliveryUpdates(provider);
@@ -120,6 +129,8 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<JmsSessionI
pendingDelivery.setContext(txId);
pendingRequest = request;
+ scheduleTimeoutIfNeeded("Timed out waiting for declare of new TX.");
+
sendTxCommand(message);
}
@@ -154,6 +165,8 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<JmsSessionI
pendingDelivery.setContext(txId);
pendingRequest = request;
+ scheduleTimeoutIfNeeded("Timed out waiting for discharge of TX.");
+
sendTxCommand(message);
}
@@ -190,6 +203,13 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<JmsSessionI
//----- Internal implementation ------------------------------------------//
+ private void scheduleTimeoutIfNeeded(String cause) {
+ AmqpProvider provider = getParent().getProvider();
+ if (provider.getRequestTimeout() != JmsConnectionInfo.INFINITE) {
+ provider.scheduleRequestTimeout(pendingRequest, provider.getRequestTimeout(), new JmsOperationTimedOutException(cause));
+ }
+ }
+
private void sendTxCommand(Message message) throws IOException {
int encodedSize = 0;
byte[] buffer = OUTBOUND_BUFFER;
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
index 1f74682..71a4dd4 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
@@ -134,4 +134,9 @@ public class AmqpConnectionBuilder extends AmqpResourceBuilder<AmqpConnection, A
protected boolean isClosePending() {
return getResource().getProperties().isConnectionOpenFailed();
}
+
+ @Override
+ protected long getRequestTimeout() {
+ return getParent().getProvider().getConnectTimeout();
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
index c2353e8..229e61f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
@@ -19,6 +19,8 @@ package org.apache.qpid.jms.provider.amqp.builders;
import java.io.IOException;
import java.util.concurrent.ScheduledFuture;
+import org.apache.qpid.jms.JmsOperationTimedOutException;
+import org.apache.qpid.jms.meta.JmsConnectionInfo;
import org.apache.qpid.jms.meta.JmsResource;
import org.apache.qpid.jms.provider.AsyncResult;
import org.apache.qpid.jms.provider.amqp.AmqpEventSink;
@@ -74,7 +76,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
// Create the resource object now
resource = createResource(parent, resourceInfo, endpoint);
- if (parent.getProvider().getRequestTimeout() > 0) {
+ if (getRequestTimeout() > JmsConnectionInfo.INFINITE) {
// Attempt to schedule a cancellation of the pending open request, can return
// null if there is no configured request timeout.
@@ -87,9 +89,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
@Override
public void onFailure(Throwable result) {
- // We ignore the default error and attempt to coerce a more
- // meaningful error from the endpoint.
- handleClosed(parent.getProvider());
+ handleClosed(parent.getProvider(), result);
}
@Override
@@ -97,7 +97,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
return request.isComplete();
}
- }, null);
+ }, getRequestTimeout(), new JmsOperationTimedOutException("Request to open resource " + getResource() + " timed out"));
}
}
@@ -110,7 +110,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
@Override
public void processRemoteClose(AmqpProvider provider) throws IOException {
- handleClosed(provider);
+ handleClosed(provider, null);
}
@Override
@@ -158,13 +158,15 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
}
}
- protected final void handleClosed(AmqpProvider provider) {
+ protected final void handleClosed(AmqpProvider provider, Throwable cause) {
// If the resource being built is closed during the creation process
// then this is always an error.
- Exception openError;
+ Throwable openError;
if (hasRemoteError()) {
openError = AmqpSupport.convertToException(getEndpoint(), getEndpoint().getRemoteCondition());
+ } else if (cause != null) {
+ openError = cause;
} else {
openError = getOpenAbortException();
}
@@ -244,11 +246,24 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
* When aborting the open operation, and there isn't an error condition,
* provided by the peer, the returned exception will be used instead.
* A subclass may override this method to provide alternative behavior.
+ *
+ * @return an Exception to describes the open failure for this resource.
*/
protected Exception getOpenAbortException() {
return new IOException("Open failed unexpectedly.");
}
+ /**
+ * Returns the configured time before the open of the resource is considered
+ * to have failed. Subclasses can override this method to provide a value more
+ * appropriate to the resource being built.
+ *
+ * @return the configured timeout before the open of the resource fails.
+ */
+ protected long getRequestTimeout() {
+ return getParent().getProvider().getRequestTimeout();
+ }
+
//----- Public access methods for the managed resources ------------------//
public ENDPOINT getEndpoint() {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
index 6dabcca..bf5bbcf 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
@@ -68,7 +68,7 @@ public class JmsConnectionTest {
@Test(timeout=30000, expected=JMSException.class)
public void testJmsConnectionThrowsJMSExceptionProviderStartFails() throws JMSException, IllegalStateException, IOException {
provider.getConfiguration().setFailOnStart(true);
- new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+ try (JmsConnection connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);) {}
}
@Test(timeout=30000)
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java
index b9c44af..556ae64 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java
@@ -34,6 +34,7 @@ import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.JmsOperationTimedOutException;
import org.apache.qpid.jms.provider.ProviderRedirectedException;
import org.apache.qpid.jms.provider.amqp.AmqpSupport;
import org.apache.qpid.jms.test.QpidJmsTestCase;
@@ -71,6 +72,24 @@ public class FailedConnectionsIntegrationTest extends QpidJmsTestCase {
}
@Test(timeout = 20000)
+ public void testConnectThrowsTimedOutExceptioWhenResponseNotSent() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ testPeer.expectSaslAnonymousConnect(true);
+ testPeer.expectClose();
+ try {
+ establishAnonymousConnecton(testPeer, true, "jms.connectTimeout=500");
+ fail("Should have thrown JmsOperationTimedOutException");
+ } catch (JmsOperationTimedOutException jmsEx) {
+ // Expected
+ } catch (Exception ex) {
+ fail("Should have thrown JMSException: " + ex);
+ }
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
public void testConnectWithNotFoundErrorThrowsJMSEWhenInvalidContainerHintNotPresent() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
testPeer.rejectConnect(AmqpError.NOT_FOUND, "Virtual Host does not exist", null);
@@ -158,8 +177,16 @@ public class FailedConnectionsIntegrationTest extends QpidJmsTestCase {
}
Connection establishAnonymousConnecton(TestAmqpPeer testPeer, boolean setClientId) throws JMSException {
+ return establishAnonymousConnecton(testPeer, setClientId, null);
+ }
+
+ Connection establishAnonymousConnecton(TestAmqpPeer testPeer, boolean setClientId, String connectionQuery) throws JMSException {
- final String remoteURI = "amqp://localhost:" + testPeer.getServerPort();
+ String remoteURI = "amqp://localhost:" + testPeer.getServerPort();
+
+ if (connectionQuery != null && !connectionQuery.isEmpty()) {
+ remoteURI += "?" + connectionQuery;
+ }
ConnectionFactory factory = new JmsConnectionFactory(remoteURI);
Connection connection = factory.createConnection();
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index 2dd723b..de1e6b3 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -49,6 +49,7 @@ import javax.jms.TextMessage;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.JmsSendTimedOutException;
import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
import org.apache.qpid.jms.test.QpidJmsTestCase;
import org.apache.qpid.jms.test.Wait;
@@ -88,7 +89,10 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
MessageProducer producer = session.createProducer(queue);
testPeer.expectDetach(true, true, true);
+ testPeer.expectClose();
+
producer.close();
+ connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
@@ -116,6 +120,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
messageMatcher.setPropertiesMatcher(propsMatcher);
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
testPeer.expectTransfer(messageMatcher);
+ testPeer.expectClose();
TextMessage message = session.createTextMessage(text);
producer.send(message);
@@ -124,6 +129,8 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
message.setText(text + text);
assertEquals(text + text, message.getText());
+ connection.close();
+
testPeer.waitForAllHandlersToComplete(1000);
}
}
@@ -147,12 +154,15 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
testPeer.expectTransfer(messageMatcher);
+ testPeer.expectClose();
Message message = session.createTextMessage();
producer.send(message);
assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
+ connection.close();
+
testPeer.waitForAllHandlersToComplete(1000);
}
}
@@ -178,6 +188,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
testPeer.expectTransfer(messageMatcher);
+ testPeer.expectClose();
Message message = session.createTextMessage();
message.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
@@ -187,6 +198,8 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
+ connection.close();
+
testPeer.waitForAllHandlersToComplete(1000);
}
}
@@ -219,6 +232,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
messageMatcher.setPropertiesMatcher(propsMatcher);
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
testPeer.expectTransfer(messageMatcher);
+ testPeer.expectClose();
Message message = session.createTextMessage(text);
@@ -228,6 +242,8 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
assertEquals("Should have had JMSDestination set", queue, message.getJMSDestination());
+ connection.close();
+
testPeer.waitForAllHandlersToComplete(1000);
}
}
@@ -263,11 +279,14 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
messageMatcher.setPropertiesMatcher(propsMatcher);
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
testPeer.expectTransfer(messageMatcher);
+ testPeer.expectClose();
Message message = session.createTextMessage(text);
producer.send(message);
+ connection.close();
+
testPeer.waitForAllHandlersToComplete(1000);
}
}
@@ -301,14 +320,17 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
messageMatcher.setPropertiesMatcher(propsMatcher);
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
testPeer.expectTransfer(messageMatcher);
+ testPeer.expectClose();
Message message = session.createTextMessage(text);
assertEquals("JMSTimestamp should not yet be set", 0, message.getJMSTimestamp());
producer.setDisableMessageTimestamp(true);
-
producer.send(message);
+
+ connection.close();
+
testPeer.waitForAllHandlersToComplete(1000);
assertEquals("JMSTimestamp should still not be set", 0, message.getJMSTimestamp());
@@ -350,11 +372,14 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
messageMatcher.setPropertiesMatcher(propsMatcher);
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
testPeer.expectTransfer(messageMatcher);
+ testPeer.expectClose();
Message message = session.createTextMessage(text);
producer.send(message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, ttl);
+ connection.close();
+
testPeer.waitForAllHandlersToComplete(1000);
}
}
@@ -406,12 +431,15 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
messageMatcher.setPropertiesMatcher(propsMatcher);
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
testPeer.expectTransfer(messageMatcher);
+ testPeer.expectClose();
Message message = session.createTextMessage(text);
message.setLongProperty(AmqpMessageSupport.JMS_AMQP_TTL, amqpTtl);
producer.send(message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, jmsTtl);
+ connection.close();
+
testPeer.waitForAllHandlersToComplete(2000);
}
}
@@ -441,6 +469,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
testPeer.expectTransfer(messageMatcher);
+ testPeer.expectClose();
Message message = session.createTextMessage();
@@ -450,6 +479,8 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
assertEquals(Message.DEFAULT_PRIORITY, message.getJMSPriority());
+ connection.close();
+
testPeer.waitForAllHandlersToComplete(1000);
}
}
@@ -481,6 +512,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
testPeer.expectTransfer(messageMatcher);
+ testPeer.expectClose();
Message message = session.createTextMessage();
@@ -490,6 +522,8 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
assertEquals(priority, message.getJMSPriority());
+ connection.close();
+
testPeer.waitForAllHandlersToComplete(1000);
}
}
@@ -522,6 +556,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
messageMatcher.setPropertiesMatcher(propsMatcher);
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
testPeer.expectTransfer(messageMatcher);
+ testPeer.expectClose();
Message message = session.createTextMessage(text);
@@ -533,7 +568,9 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
assertNotNull("JMSMessageID should be set", jmsMessageID);
assertTrue("JMS 'ID:' prefix not found", jmsMessageID.startsWith("ID:"));
- //Get the value that was actually transmitted/received, verify it is a string, compare to what we have locally
+ connection.close();
+
+ // Get the value that was actually transmitted/received, verify it is a string, compare to what we have locally
testPeer.waitForAllHandlersToComplete(1000);
Object receivedMessageId = propsMatcher.getReceivedMessageId();
@@ -571,6 +608,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
messageMatcher.setPropertiesMatcher(propsMatcher);
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
testPeer.expectTransfer(messageMatcher);
+ testPeer.expectClose();
Message message = session.createTextMessage(text);
@@ -582,7 +620,9 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
assertNotNull("JMSMessageID should be set", jmsMessageID);
assertTrue("JMS 'ID:' prefix not found", jmsMessageID.startsWith("ID:"));
- //Get the value that was actually transmitted/received, verify it is a String, compare to what we have locally
+ connection.close();
+
+ // Get the value that was actually transmitted/received, verify it is a String, compare to what we have locally
testPeer.waitForAllHandlersToComplete(1000);
Object receivedMessageId = propsMatcher.getReceivedMessageId();
@@ -621,6 +661,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
messageMatcher.setPropertiesMatcher(propsMatcher);
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
testPeer.expectTransfer(messageMatcher);
+ testPeer.expectClose();
Message message = session.createTextMessage(text);
@@ -632,7 +673,9 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
assertNotNull("JMSMessageID should be set", jmsMessageID);
assertTrue("JMS 'ID:' prefix not found", jmsMessageID.startsWith("ID:"));
- //Get the value that was actually transmitted/received, verify it is a UUID, compare to what we have locally
+ connection.close();
+
+ // Get the value that was actually transmitted/received, verify it is a UUID, compare to what we have locally
testPeer.waitForAllHandlersToComplete(1000);
Object receivedMessageId = propsMatcher.getReceivedMessageId();
@@ -687,6 +730,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
messageMatcher.setPropertiesMatcher(propsMatcher);
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
testPeer.expectTransfer(messageMatcher);
+ testPeer.expectClose();
Message message = session.createTextMessage(text);
@@ -704,6 +748,8 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
assertNull("JMSMessageID should be null", message.getJMSMessageID());
+ connection.close();
+
testPeer.waitForAllHandlersToComplete(2000);
}
}
@@ -778,6 +824,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
// response, simply remotely close the producer instead.
testPeer.expectTransfer(messageMatcher, nullValue(), false, false, null, false);
testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true, AmqpError.RESOURCE_LIMIT_EXCEEDED, BREAD_CRUMB);
+ testPeer.expectClose();
Queue queue = session.createQueue("myQueue");
final MessageProducer producer = session.createProducer(queue);
@@ -793,6 +840,8 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
assertTrue("Expected breadcrumb to be present in message", jmse.getMessage().contains(BREAD_CRUMB));
}
+ connection.close();
+
testPeer.waitForAllHandlersToComplete(3000);
}
}
@@ -860,10 +909,89 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
testPeer.expectSenderAttach(100);
testPeer.expectTransfer(messageMatcher);
+ testPeer.expectClose();
MessageProducer producer = session.createProducer(queue);
producer.send(message);
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testSendWhenLinkCreditIsZeroAndTimeout() throws Exception {
+ try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+ connection.setSendTimeout(500);
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String queueName = "myQueue";
+ Queue queue = session.createQueue(queueName);
+
+ Message message = session.createTextMessage("text");
+
+ // Expect the producer to attach. Don't send any credit so that the client will
+ // block on a send and we can test our timeouts.
+ testPeer.expectSenderAttachWithoutGrantingCredit();
+ testPeer.expectClose();
+
+ MessageProducer producer = session.createProducer(queue);
+
+ try {
+ producer.send(message);
+ fail("Send should time out.");
+ } catch (JmsSendTimedOutException jmsEx) {
+ LOG.info("Caught expected error: {}", jmsEx.getMessage());
+ } catch (Throwable error) {
+ fail("Send should time out, but got: " + error.getMessage());
+ }
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testSendTimesOutWhenNoDispostionArrives() throws Exception {
+ try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+ connection.setSendTimeout(500);
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String queueName = "myQueue";
+ Queue queue = session.createQueue(queueName);
+
+ Message message = session.createTextMessage("text");
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+
+ // Expect the producer to attach and grant it some credit, it should send
+ // a transfer which we will not send any response for which should cause the
+ // send operation to time out.
+ testPeer.expectSenderAttach(100);
+ testPeer.expectTransferButDoNotRespond(messageMatcher);
+ testPeer.expectClose();
+
+ MessageProducer producer = session.createProducer(queue);
+
+ try {
+ producer.send(message);
+ fail("Send should time out.");
+ } catch (JmsSendTimedOutException jmsEx) {
+ LOG.info("Caught expected error: {}", jmsEx.getMessage());
+ } catch (Throwable error) {
+ fail("Send should time out, but got: " + error.getMessage());
+ }
+
+ connection.close();
+
testPeer.waitForAllHandlersToComplete(1000);
}
}
@@ -915,6 +1043,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
Message message = session.createTextMessage("content");
testPeer.expectTransfer(new TransferPayloadCompositeMatcher(), nullValue(), false, responseState, true);
+ testPeer.expectClose();
assertNull("Should not yet have a JMSDestination", message.getJMSDestination());
@@ -926,6 +1055,8 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
// Expected
}
+ connection.close();
+
testPeer.waitForAllHandlersToComplete(2000);
}
}
@@ -967,13 +1098,12 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
private void doAsyncSendMessageNotAcceptedTestImpl(ListDescribedType responseState) throws JMSException, InterruptedException, Exception, IOException {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
- Connection connection = testFixture.establishConnecton(testPeer);
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
final CountDownLatch asyncError = new CountDownLatch(1);
- JmsConnection jmsConnection = (JmsConnection) connection;
- jmsConnection.setForceAsyncSend(true);
- jmsConnection.setExceptionListener(new ExceptionListener() {
+ connection.setForceAsyncSend(true);
+ connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
@@ -1011,6 +1141,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
assertTrue("Should get a non-fatal error", asyncError.await(10, TimeUnit.SECONDS));
testPeer.expectTransfer(new TransferPayloadCompositeMatcher());
+ testPeer.expectClose();
try {
producer.send(message);
@@ -1019,6 +1150,8 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
fail("No expected exception for this send.");
}
+ connection.close();
+
testPeer.waitForAllHandlersToComplete(2000);
}
}
@@ -1078,7 +1211,10 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
producer.send(session.createMessage());
testPeer.expectDetach(true, true, true);
+ testPeer.expectClose();
+
producer.close();
+ connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
index b13fa00..b041f51 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
@@ -146,11 +146,10 @@ public class QueueBrowserIntegrationTest extends QpidJmsTestCase {
final DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
- Connection connection = testFixture.establishConnecton(testPeer);
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.start();
- JmsConnection jmsConnection = (JmsConnection) connection;
- jmsConnection.getPrefetchPolicy().setAll(1);
+ connection.getPrefetchPolicy().setAll(1);
testPeer.expectBegin();
@@ -305,11 +304,10 @@ public class QueueBrowserIntegrationTest extends QpidJmsTestCase {
@Test(timeout=30000)
public void testCreateQueueBrowserAndEnumerationZeroPrefetch() throws IOException, Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
- Connection connection = testFixture.establishConnecton(testPeer);
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.start();
- JmsConnection jmsConnection = (JmsConnection) connection;
- jmsConnection.getPrefetchPolicy().setAll(0);
+ connection.getPrefetchPolicy().setAll(0);
testPeer.expectBegin();
@@ -333,11 +331,10 @@ public class QueueBrowserIntegrationTest extends QpidJmsTestCase {
@Test(timeout=30000)
public void testQueueBrowserHasMoreElementsZeroPrefetchNoMessage() throws IOException, Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
- Connection connection = testFixture.establishConnecton(testPeer);
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.start();
- JmsConnection jmsConnection = (JmsConnection) connection;
- jmsConnection.getPrefetchPolicy().setAll(0);
+ connection.getPrefetchPolicy().setAll(0);
testPeer.expectBegin();
@@ -365,11 +362,10 @@ public class QueueBrowserIntegrationTest extends QpidJmsTestCase {
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
- Connection connection = testFixture.establishConnecton(testPeer);
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.start();
- JmsConnection jmsConnection = (JmsConnection) connection;
- jmsConnection.getPrefetchPolicy().setAll(0);
+ connection.getPrefetchPolicy().setAll(0);
testPeer.expectBegin();
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 415ffb1..7275cac 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -56,6 +56,7 @@ import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsOperationTimedOutException;
import org.apache.qpid.jms.JmsPrefetchPolicy;
import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper;
import org.apache.qpid.jms.test.QpidJmsTestCase;
@@ -243,7 +244,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
// even though there is no detach response.
session.createConsumer(dest);
fail("Consumer creation should have failed when link was refused");
- } catch(JMSException ex) {
+ } catch(JmsOperationTimedOutException ex) {
// Expected
LOG.info("Caught expected error on consumer create: {}", ex.getMessage());
}
@@ -276,7 +277,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
QueueBrowser browser = session.createBrowser(dest);
browser.getEnumeration();
fail("Consumer creation should have failed when link was refused");
- } catch(JMSException ex) {
+ } catch(JmsOperationTimedOutException ex) {
// Expected
LOG.info("Caught expected error on browser create: {}", ex.getMessage());
}
@@ -995,7 +996,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
// Create a producer, expect it to throw exception due to the link-refusal
session.createProducer(dest);
fail("Producer creation should have failed when link was refused");
- } catch(JMSException ex) {
+ } catch(JmsOperationTimedOutException ex) {
// Expected
LOG.info("Caught expected exception on create: {}", ex.getMessage());
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
index 2ef7046..dfd2f1c 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
@@ -39,6 +39,7 @@ import javax.jms.TextMessage;
import javax.jms.TransactionRolledBackException;
import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsOperationTimedOutException;
import org.apache.qpid.jms.JmsPrefetchPolicy;
import org.apache.qpid.jms.test.QpidJmsTestCase;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
@@ -146,6 +147,8 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
// reply with a declared disposition state containing the txnId.
txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
testPeer.expectDeclare(txnId);
+ testPeer.expectDischarge(txnId, true);
+ testPeer.expectClose();
try {
session.commit();
@@ -153,6 +156,8 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
} catch (TransactionRolledBackException jmsTxRb) {
}
+ connection.close();
+
testPeer.waitForAllHandlersToComplete(1000);
}
}
@@ -225,9 +230,13 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
txState.setOutcome(new Accepted());
testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true);
+ testPeer.expectDischarge(txnId, true);
+ testPeer.expectClose();
producer.send(session.createMessage());
+ connection.close();
+
testPeer.waitForAllHandlersToComplete(1000);
}
}
@@ -300,9 +309,13 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
txState.setOutcome(new Accepted());
testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true);
+ testPeer.expectDischarge(txnId, true);
+ testPeer.expectClose();
producer.send(session.createMessage());
+ connection.close();
+
testPeer.waitForAllHandlersToComplete(1000);
}
}
@@ -410,6 +423,11 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
messageConsumer.close();
}
+ testPeer.expectDischarge(txnId, true);
+ testPeer.expectClose();
+
+ connection.close();
+
testPeer.waitForAllHandlersToComplete(1000);
}
}
@@ -450,9 +468,13 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
// reply with a declared disposition state containing the txnId.
txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
testPeer.expectDeclare(txnId);
+ testPeer.expectDischarge(txnId, true);
+ testPeer.expectClose();
session.commit();
+ connection.close();
+
testPeer.waitForAllHandlersToComplete(1000);
}
}
@@ -493,9 +515,13 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
// reply with a declared disposition state containing the txnId.
txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
testPeer.expectDeclare(txnId);
+ testPeer.expectDischarge(txnId, true);
+ testPeer.expectClose();
session.rollback();
+ connection.close();
+
testPeer.waitForAllHandlersToComplete(1000);
}
}
@@ -537,9 +563,13 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
txState.setOutcome(new Accepted());
testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true);
+ testPeer.expectDischarge(txnId, true);
+ testPeer.expectClose();
producer.send(session.createMessage());
+ connection.close();
+
testPeer.waitForAllHandlersToComplete(1000);
}
}
@@ -634,8 +664,13 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
testPeer.expectLinkFlow(false, false, greaterThan(UnsignedInteger.ZERO));
}
+ testPeer.expectDischarge(txnId, true);
+ testPeer.expectClose();
+
session.rollback();
+ connection.close();
+
testPeer.waitForAllHandlersToComplete(1000);
}
}
@@ -705,8 +740,13 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
// Expect the consumer to be 'started' again as rollback completes
testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(messageCount)));
+ testPeer.expectDischarge(txnId, true);
+ testPeer.expectClose();
+
session.rollback();
+ connection.close();
+
testPeer.waitForAllHandlersToComplete(1000);
}
}
@@ -780,9 +820,13 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
// Expect the consumer to be 'started' again as rollback completes
testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(messageCount)));
+ testPeer.expectDischarge(txnId, true);
+ testPeer.expectClose();
session.rollback();
+ connection.close();
+
testPeer.waitForAllHandlersToComplete(1000);
}
}
@@ -814,9 +858,13 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
testPeer.expectReceiverAttach(notNullValue(), sourceMatcher);
testPeer.expectLinkFlow();
+ testPeer.expectDischarge(txnId, true);
+ testPeer.expectClose();
session.createConsumer(queue);
+ connection.close();
+
testPeer.waitForAllHandlersToComplete(1000);
}
}
@@ -835,6 +883,8 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
testPeer.remotelyCloseLastCoordinatorLinkOnDischarge(txnId, false);
testPeer.expectCoordinatorAttach();
testPeer.expectDeclare(txnId);
+ testPeer.expectDischarge(txnId, true);
+ testPeer.expectClose();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
@@ -845,6 +895,8 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
LOG.info("Caught expected TransactionRolledBackException");
}
+ connection.close();
+
testPeer.waitForAllHandlersToComplete(1000);
}
}
@@ -863,6 +915,8 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
testPeer.remotelyCloseLastCoordinatorLinkOnDischarge(txnId, true);
testPeer.expectCoordinatorAttach();
testPeer.expectDeclare(txnId);
+ testPeer.expectDischarge(txnId, true);
+ testPeer.expectClose();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
@@ -873,6 +927,8 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
LOG.info("Caught expected JMSException");
}
+ connection.close();
+
testPeer.waitForAllHandlersToComplete(1000);
}
}
@@ -911,12 +967,18 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
testPeer.expectCoordinatorAttach();
testPeer.expectDeclare(txnId);
+ // Expect that the session TX will rollback on close.
+ testPeer.expectDischarge(txnId, true);
+ testPeer.expectClose();
+
try {
session.commit();
fail("Commit operation should have failed.");
} catch (TransactionRolledBackException jmsTxRb) {
}
+ connection.close();
+
testPeer.waitForAllHandlersToComplete(1000);
}
}
@@ -960,12 +1022,192 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
testPeer.expectCoordinatorAttach();
testPeer.expectDeclare(txnId);
+ // Expect that the session TX will rollback on close.
+ testPeer.expectDischarge(txnId, true);
+ testPeer.expectClose();
+
try {
session.commit();
fail("Commit operation should have failed.");
} catch (TransactionRolledBackException jmsTxRb) {
}
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout=20000)
+ public void testSessionCreateFailsOnDeclareTimeout() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+ connection.setRequestTimeout(500);
+ connection.start();
+
+ testPeer.expectBegin();
+ testPeer.expectCoordinatorAttach();
+ testPeer.expectDeclareButDoNotRespond();
+ testPeer.expectClose();
+
+ try {
+ connection.createSession(true, Session.SESSION_TRANSACTED);
+ fail("Should have timed out waiting for declare.");
+ } catch (JmsOperationTimedOutException jmsEx) {
+ } catch (Throwable error) {
+ fail("Should have caught an timed out exception:");
+ LOG.error("Caught -> ", error);
+ }
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout=20000)
+ public void testTransactionRolledBackOnSessionCloseTimesOut() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+ connection.setRequestTimeout(500);
+ connection.start();
+
+ testPeer.expectBegin();
+ testPeer.expectCoordinatorAttach();
+
+ Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+ testPeer.expectDeclare(txnId);
+
+ // Closed session should roll-back the TX with a failed discharge
+ testPeer.expectDischargeButDoNotRespond(txnId, true);
+ testPeer.expectClose();
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ try {
+ session.close();
+ fail("Should have timed out waiting for declare.");
+ } catch (JmsOperationTimedOutException jmsEx) {
+ } catch (Throwable error) {
+ fail("Should have caught an timed out exception:");
+ LOG.error("Caught -> ", error);
+ }
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout=20000)
+ public void testTransactionRolledBackTimesOut() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+ connection.setRequestTimeout(500);
+ connection.start();
+
+ testPeer.expectBegin();
+ testPeer.expectCoordinatorAttach();
+
+ Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+ testPeer.expectDeclare(txnId);
+
+ // Closed session should roll-back the TX with a failed discharge
+ testPeer.expectDischargeButDoNotRespond(txnId, true);
+
+ // Session should throw from the rollback and then try and recover.
+ testPeer.expectDeclare(txnId);
+ testPeer.expectDischarge(txnId, true);
+ testPeer.expectClose();
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ try {
+ session.rollback();
+ fail("Should have timed out waiting for declare.");
+ } catch (JmsOperationTimedOutException jmsEx) {
+ } catch (Throwable error) {
+ fail("Should have caught an timed out exception:");
+ LOG.error("Caught -> ", error);
+ }
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout=20000)
+ public void testTransactionCommitTimesOut() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+ connection.setRequestTimeout(500);
+ connection.start();
+
+ testPeer.expectBegin();
+ testPeer.expectCoordinatorAttach();
+
+ Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+ testPeer.expectDeclare(txnId);
+
+ // Closed session should roll-back the TX with a failed discharge
+ testPeer.expectDischargeButDoNotRespond(txnId, false);
+
+ // Session should throw from the commit and then try and recover.
+ testPeer.expectDeclare(txnId);
+ testPeer.expectDischarge(txnId, true);
+ testPeer.expectClose();
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ try {
+ session.commit();
+ fail("Should have timed out waiting for declare.");
+ } catch (JmsOperationTimedOutException jmsEx) {
+ } catch (Throwable error) {
+ fail("Should have caught an timed out exception:");
+ LOG.error("Caught -> ", error);
+ }
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout=20000)
+ public void testTransactionCommitTimesOutAndNoNextBeginTimesOut() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+ connection.setRequestTimeout(500);
+ connection.start();
+
+ testPeer.expectBegin();
+ testPeer.expectCoordinatorAttach();
+
+ Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+ testPeer.expectDeclare(txnId);
+
+ // Closed session should roll-back the TX with a failed discharge
+ testPeer.expectDischargeButDoNotRespond(txnId, false);
+
+ // Session should throw from the commit and then try and recover.
+ testPeer.expectDeclareButDoNotRespond();
+ testPeer.expectClose();
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ try {
+ session.commit();
+ fail("Should have timed out waiting for declare.");
+ } catch (JmsOperationTimedOutException jmsEx) {
+ } catch (Throwable error) {
+ fail("Should have caught an timed out exception:");
+ LOG.error("Caught -> ", error);
+ }
+
+ connection.close();
+
testPeer.waitForAllHandlersToComplete(1000);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index 6b6dcb5..64af655 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -868,7 +868,7 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
testPeer.dropAfterLastHandler();
final JmsConnection connection = establishAnonymousConnecton(
- "jms.requestTimeout=2000&failover.reconnectDelay=2000&failover.maxReconnectAttempts=60",
+ "jms.requestTimeout=1000&failover.reconnectDelay=2000&failover.maxReconnectAttempts=60",
testPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@@ -925,7 +925,7 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
testPeer.dropAfterLastHandler();
final JmsConnection connection = establishAnonymousConnecton(
- "jms.sendTimeout=2000&failover.reconnectDelay=2000&failover.maxReconnectAttempts=60",
+ "jms.sendTimeout=1000&failover.reconnectDelay=2000&failover.maxReconnectAttempts=60",
testPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@@ -986,7 +986,7 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
testPeer.dropAfterLastHandler();
final JmsConnection connection = establishAnonymousConnecton(
- "jms.requestTimeout=2000&failover.reconnectDelay=2000&failover.maxReconnectAttempts=60",
+ "jms.requestTimeout=1000&failover.reconnectDelay=2000&failover.maxReconnectAttempts=60",
testPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index d4b304e..7ffdb2e 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -20,11 +20,11 @@ package org.apache.qpid.jms.test.testpeer;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DYNAMIC_NODE_LIFETIME_POLICY;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
@@ -419,7 +419,8 @@ public class TestAmqpPeer implements AutoCloseable
}
public void expectSaslConnect(Symbol mechanism, Matcher<Binary> initialResponseMatcher, Symbol[] desiredCapabilities, Symbol[] serverCapabilities,
- Matcher<?> clientPropertiesMatcher, Map<Symbol, Object> serverProperties, Matcher<?> idleTimeoutMatcher, Matcher<?> hostnameMatcher)
+ Matcher<?> clientPropertiesMatcher, Map<Symbol, Object> serverProperties, Matcher<?> idleTimeoutMatcher,
+ Matcher<?> hostnameMatcher, boolean deferOpened)
{
SaslMechanismsFrame saslMechanismsFrame = new SaslMechanismsFrame().setSaslServerMechanisms(mechanism);
addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, AmqpHeader.SASL_HEADER,
@@ -467,12 +468,10 @@ public class TestAmqpPeer implements AutoCloseable
open.setProperties(serverProperties);
}
- OpenMatcher openMatcher = new OpenMatcher()
- .withContainerId(notNullValue(String.class))
- .onCompletion(new FrameSender(
- this, FrameType.AMQP, 0,
- open,
- null));
+ OpenMatcher openMatcher = new OpenMatcher().withContainerId(notNullValue(String.class));
+ if (!deferOpened) {
+ openMatcher.onCompletion(new FrameSender(this, FrameType.AMQP, 0, open, null));
+ }
if (desiredCapabilities != null)
{
@@ -515,7 +514,7 @@ public class TestAmqpPeer implements AutoCloseable
Matcher<Binary> initialResponseMatcher = equalTo(new Binary(data));
- expectSaslConnect(PLAIN, initialResponseMatcher, desiredCapabilities, serverCapabilities, null, serverProperties, null, null);
+ expectSaslConnect(PLAIN, initialResponseMatcher, desiredCapabilities, serverCapabilities, null, serverProperties, null, null, false);
}
public void expectSaslExternalConnect()
@@ -525,7 +524,7 @@ public class TestAmqpPeer implements AutoCloseable
throw new IllegalStateException("need-client-cert must be enabled on the test peer");
}
- expectSaslConnect(EXTERNAL, equalTo(new Binary(new byte[0])), new Symbol[] { AmqpSupport.SOLE_CONNECTION_CAPABILITY }, null, null, null, null, null);
+ expectSaslConnect(EXTERNAL, equalTo(new Binary(new byte[0])), new Symbol[] { AmqpSupport.SOLE_CONNECTION_CAPABILITY }, null, null, null, null, null, false);
}
public void expectSaslAnonymousConnect()
@@ -533,6 +532,11 @@ public class TestAmqpPeer implements AutoCloseable
expectSaslAnonymousConnect(null, null);
}
+ public void expectSaslAnonymousConnect(boolean deferOpened)
+ {
+ expectSaslConnect(ANONYMOUS, equalTo(new Binary(new byte[0])), new Symbol[] { AmqpSupport.SOLE_CONNECTION_CAPABILITY }, null, null, null, null, null, deferOpened);
+ }
+
public void expectSaslAnonymousConnect(Matcher<?> idleTimeoutMatcher, Matcher<?> hostnameMatcher)
{
expectSaslAnonymousConnect(idleTimeoutMatcher, hostnameMatcher, null, null);
@@ -540,7 +544,7 @@ public class TestAmqpPeer implements AutoCloseable
public void expectSaslAnonymousConnect(Matcher<?> idleTimeoutMatcher, Matcher<?> hostnameMatcher, Matcher<?> propertiesMatcher, Map<Symbol, Object> serverProperties)
{
- expectSaslConnect(ANONYMOUS, equalTo(new Binary(new byte[0])), new Symbol[] { AmqpSupport.SOLE_CONNECTION_CAPABILITY }, null, propertiesMatcher, serverProperties, idleTimeoutMatcher, hostnameMatcher);
+ expectSaslConnect(ANONYMOUS, equalTo(new Binary(new byte[0])), new Symbol[] { AmqpSupport.SOLE_CONNECTION_CAPABILITY }, null, propertiesMatcher, serverProperties, idleTimeoutMatcher, hostnameMatcher, false);
}
public void expectFailingSaslConnect(Symbol[] serverMechs, Symbol clientSelectedMech)
@@ -844,9 +848,14 @@ public class TestAmqpPeer implements AutoCloseable
expectSenderAttach(notNullValue(), false, false);
}
+ public void expectSenderAttachWithoutGrantingCredit()
+ {
+ expectSenderAttach(notNullValue(), notNullValue(), false, false, false, 0, 0, null, null);
+ }
+
public void expectSenderAttach(long creditFlowDelay)
{
- expectSenderAttach(notNullValue(), notNullValue(), false, false, false, creditFlowDelay, null, null);
+ expectSenderAttach(notNullValue(), notNullValue(), false, false, false, creditFlowDelay, 100, null, null);
}
public void expectSenderAttach(final Matcher<?> targetMatcher, final boolean refuseLink, boolean deferAttachResponseWrite)
@@ -861,6 +870,11 @@ public class TestAmqpPeer implements AutoCloseable
public void expectSenderAttach(final Matcher<?> sourceMatcher, final Matcher<?> targetMatcher, final boolean refuseLink, boolean omitDetach, boolean deferAttachResponseWrite, long creditFlowDelay, Symbol errorType, String errorMessage)
{
+ expectSenderAttach(sourceMatcher, targetMatcher, refuseLink, omitDetach, deferAttachResponseWrite, creditFlowDelay, 100, errorType, errorMessage);
+ }
+
+ public void expectSenderAttach(final Matcher<?> sourceMatcher, final Matcher<?> targetMatcher, final boolean refuseLink, boolean omitDetach, boolean deferAttachResponseWrite, long creditFlowDelay, int creditAmount, Symbol errorType, String errorMessage)
+ {
final AttachMatcher attachMatcher = new AttachMatcher()
.withName(notNullValue())
.withHandle(notNullValue())
@@ -942,7 +956,7 @@ public class TestAmqpPeer implements AutoCloseable
.setIncomingWindow(UnsignedInteger.valueOf(2048))
.setNextOutgoingId(UnsignedInteger.ONE) //TODO: shouldnt be hard coded
.setOutgoingWindow(UnsignedInteger.valueOf(2048))
- .setLinkCredit(UnsignedInteger.valueOf(100));
+ .setLinkCredit(UnsignedInteger.valueOf(creditAmount));
// The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
final FrameSender flowFrameSender = new FrameSender(this, FrameType.AMQP, -1, flowFrame, null);
@@ -1476,6 +1490,11 @@ public class TestAmqpPeer implements AutoCloseable
return payloadData.encode();
}
+ public void expectTransferButDoNotRespond(Matcher<Binary> expectedPayloadMatcher)
+ {
+ expectTransfer(expectedPayloadMatcher, nullValue(), false, false, null, false);
+ }
+
public void expectTransfer(Matcher<Binary> expectedPayloadMatcher)
{
expectTransfer(expectedPayloadMatcher, nullValue(), false, true, new Accepted(), true);
@@ -1538,6 +1557,14 @@ public class TestAmqpPeer implements AutoCloseable
expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
}
+ public void expectDeclareButDoNotRespond()
+ {
+ TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
+ declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
+
+ expectTransfer(declareMatcher, nullValue(), false, false, null, false);
+ }
+
public void expectDischarge(Binary txnId, boolean dischargeState) {
expectDischarge(txnId, dischargeState, new Accepted());
}
@@ -1555,6 +1582,19 @@ public class TestAmqpPeer implements AutoCloseable
expectTransfer(dischargeMatcher, nullValue(), false, responseState, true);
}
+ public void expectDischargeButDoNotRespond(Binary txnId, boolean dischargeState) {
+ // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+ // and reply with given response and settled disposition to indicate the outcome.
+ Discharge discharge = new Discharge();
+ discharge.setFail(dischargeState);
+ discharge.setTxnId(txnId);
+
+ TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher();
+ dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
+
+ expectTransfer(dischargeMatcher, nullValue(), false, false, null, true);
+ }
+
public void remotelyCloseLastCoordinatorLink()
{
remotelyCloseLastCoordinatorLink(true, true, TransactionError.TRANSACTION_ROLLBACK, "Discharge of TX failed.");
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org