You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/05/06 21:54:10 UTC
qpid-jms git commit: https://issues.apache.org/jira/browse/QPIDJMS-175
Repository: qpid-jms
Updated Branches:
refs/heads/master 974037eac -> 926fc571d
https://issues.apache.org/jira/browse/QPIDJMS-175
Add amqp.drainTimeout to handle the case where the remote doesn't send
any response for drain request.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/926fc571
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/926fc571
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/926fc571
Branch: refs/heads/master
Commit: 926fc571da5023137bb42035d84139b8da3a981d
Parents: 974037e
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri May 6 17:54:00 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri May 6 17:54:00 2016 -0400
----------------------------------------------------------------------
.../java/org/apache/qpid/jms/JmsConnection.java | 14 ++---
.../apache/qpid/jms/JmsConnectionListener.java | 24 ++++----
.../org/apache/qpid/jms/JmsMessageConsumer.java | 2 +-
.../java/org/apache/qpid/jms/JmsSession.java | 16 ++---
.../jms/provider/DefaultProviderListener.java | 2 +-
.../qpid/jms/provider/ProviderListener.java | 10 ++--
.../qpid/jms/provider/ProviderWrapper.java | 4 +-
.../jms/provider/amqp/AmqpAbstractResource.java | 6 +-
.../qpid/jms/provider/amqp/AmqpConsumer.java | 43 +++++++++----
.../qpid/jms/provider/amqp/AmqpProvider.java | 29 +++++++--
.../qpid/jms/JmsDefaultConnectionListener.java | 6 +-
.../integration/ConsumerIntegrationTest.java | 63 +++++++++++++++++++-
.../integration/ProducerIntegrationTest.java | 2 +-
.../jms/integration/SessionIntegrationTest.java | 4 +-
.../JmsConsumerPriorityDispatchTest.java | 12 ++--
.../jms/discovery/FileWatcherDiscoveryTest.java | 6 +-
.../jms/discovery/JmsAmqpDiscoveryTest.java | 6 +-
.../transactions/JmsTransactedConsumerTest.java | 6 +-
18 files changed, 178 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 4374954..ab9fb7a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -1194,7 +1194,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
}
@Override
- public void onResourceRemotelyClosed(final JmsResource resource, final Exception cause) {
+ public void onResourceClosed(final JmsResource resource, final Exception cause) {
// Closure of the Connection itself is notified via onConnectionFailure
// Run on the connection executor to free the provider to go do more work and avoid
@@ -1207,19 +1207,19 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
if (resource instanceof JmsSessionInfo) {
JmsSession session = sessions.get(resource.getId());
if (session != null) {
- session.remotelyClosed(cause);
+ session.sessionClosed(cause);
for (JmsConnectionListener listener : connectionListeners) {
- listener.onSessionRemotelyClosed(session, cause);
+ listener.onSessionClosed(session, cause);
}
}
} else if (resource instanceof JmsProducerInfo) {
JmsSessionId parentId = ((JmsProducerInfo) resource).getParentId();
JmsSession session = sessions.get(parentId);
if (session != null) {
- JmsMessageProducer producer = session.producerRemotelyClosed((JmsProducerInfo) resource, cause);
+ JmsMessageProducer producer = session.producerClosed((JmsProducerInfo) resource, cause);
if (producer != null) {
for (JmsConnectionListener listener : connectionListeners) {
- listener.onProducerRemotelyClosed(producer, cause);
+ listener.onProducerClosed(producer, cause);
}
}
}
@@ -1227,10 +1227,10 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
JmsSessionId parentId = ((JmsConsumerInfo) resource).getParentId();
JmsSession session = sessions.get(parentId);
if (session != null) {
- JmsMessageConsumer consumer = session.consumerRemotelyClosed((JmsConsumerInfo) resource, cause);
+ JmsMessageConsumer consumer = session.consumerClosed((JmsConsumerInfo) resource, cause);
if (consumer != null) {
for (JmsConnectionListener listener : connectionListeners) {
- listener.onConsumerRemotelyClosed(consumer, cause);
+ listener.onConsumerClosed(consumer, cause);
}
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java
index f06f343..7aecc44 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java
@@ -75,33 +75,33 @@ public interface JmsConnectionListener {
void onInboundMessage(JmsInboundMessageDispatch envelope);
/**
- * Called when the remote peer closes a session.
+ * Called when the session is closed due to remote action or local error detection.
*
* @param session
- * The session that was closed on the remote end.
+ * The session that was closed and needs to be cleaned up.
* @param cause
- * The exception that provides additional context on the remote closure.
+ * The exception that provides additional context on the closure.
*/
- void onSessionRemotelyClosed(Session session, Exception cause);
+ void onSessionClosed(Session session, Exception cause);
/**
- * Called when the remote peer closes a MessageConsumer.
+ * Called when the MessageConsumer is closed due to remote action or local error detection.
*
* @param consumer
- * The consumer that was closed on the remote end.
+ * The consumer that was closed and needs to be cleaned up.
* @param cause
- * The exception that provides additional context on the remote closure.
+ * The exception that provides additional context on the closure.
*/
- void onConsumerRemotelyClosed(MessageConsumer consumer, Exception cause);
+ void onConsumerClosed(MessageConsumer consumer, Exception cause);
/**
- * Called when the remote peer closes a MessageProducer.
+ * Called when the MessageProducer is closed due to remote action or local error detection.
*
* @param producer
- * The producer that was closed on the remote end.
+ * The producer that was closed and needs to be cleaned up.
* @param cause
- * The exception that provides additional context on the remote closure.
+ * The exception that provides additional context on the closure.
*/
- void onProducerRemotelyClosed(MessageProducer producer, Exception cause);
+ void onProducerClosed(MessageProducer producer, Exception cause);
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index 46f19ca..d63af7c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -257,7 +257,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
// is redundant: zero-prefetch consumers already pull, and
// the rest block indefinitely on the local messageQueue.
pullForced = true;
- if(performPullIfRequired(timeout, true)) {
+ if (performPullIfRequired(timeout, true)) {
startConsumerResource();
// We refresh credit if it is a prefetching consumer, since the
// pull drained it. Processing acks can open the credit window, but
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 64c514e..3569fc7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -268,16 +268,16 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
}
}
- void remotelyClosed(Exception cause) {
+ void sessionClosed(Exception cause) {
try {
shutdown(cause);
} catch (Throwable error) {
- LOG.trace("Ignoring exception thrown during cleanup of remotely closed session", error);
+ LOG.trace("Ignoring exception thrown during cleanup of closed session", error);
}
}
- JmsMessageConsumer consumerRemotelyClosed(JmsConsumerInfo resource, Exception cause) {
- LOG.info("A JMS MessageConsumer has been remotely closed: {}", resource);
+ JmsMessageConsumer consumerClosed(JmsConsumerInfo resource, Exception cause) {
+ LOG.info("A JMS MessageConsumer has been closed: {}", resource);
JmsMessageConsumer consumer = consumers.get(resource.getId());
@@ -286,14 +286,14 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
consumer.shutdown(cause);
}
} catch (Throwable error) {
- LOG.trace("Ignoring exception thrown during cleanup of remotely closed consumer", error);
+ LOG.trace("Ignoring exception thrown during cleanup of closed consumer", error);
}
return consumer;
}
- JmsMessageProducer producerRemotelyClosed(JmsProducerInfo resource, Exception cause) {
- LOG.info("A JMS MessageProducer has been remotely closed: {}", resource);
+ JmsMessageProducer producerClosed(JmsProducerInfo resource, Exception cause) {
+ LOG.info("A JMS MessageProducer has been closed: {}", resource);
JmsMessageProducer producer = producers.get(resource.getId());
@@ -302,7 +302,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
producer.shutdown(cause);
}
} catch (Throwable error) {
- LOG.trace("Ignoring exception thrown during cleanup of remotely closed producer", error);
+ LOG.trace("Ignoring exception thrown during cleanup of closed producer", error);
}
return producer;
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java
index b8ad9d2..22e204e 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java
@@ -56,7 +56,7 @@ public class DefaultProviderListener implements ProviderListener {
}
@Override
- public void onResourceRemotelyClosed(JmsResource resource, Exception cause) {
+ public void onResourceClosed(JmsResource resource, Exception cause) {
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java
index ebb653d..5c758ed 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java
@@ -112,15 +112,17 @@ public interface ProviderListener {
void onConnectionFailure(IOException ex);
/**
- * Called to indicate that a currently active resource has been closed on the
- * remote end due to management or other action.
+ * Called to indicate that a currently active resource has been closed
+ * due to some error condition, management request or some other action.
+ * This can either be initiated remotely or locally depending on the
+ * condition that triggers the close.
*
* @param resource
- * the JmsResource instance that has been remotely closed.
+ * the JmsResource instance that has been closed.
* @param cause
* optional exception object that indicates the cause of the close.
*/
- void onResourceRemotelyClosed(JmsResource resource, Exception cause);
+ void onResourceClosed(JmsResource resource, Exception cause);
/**
* Called to indicate that a some client operation caused or received an
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
index e5c5799..8574e04 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
@@ -184,8 +184,8 @@ public class ProviderWrapper<E extends Provider> implements Provider, ProviderLi
}
@Override
- public void onResourceRemotelyClosed(JmsResource resource, Exception cause) {
- listener.onResourceRemotelyClosed(resource, cause);
+ public void onResourceClosed(JmsResource resource, Exception cause) {
+ listener.onResourceClosed(resource, cause);
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
index 81f8657..7476d9d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
@@ -144,8 +144,10 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
}
public void remotelyClosed(AmqpProvider provider) {
- Exception error = AmqpSupport.convertToException(getEndpoint(), getEndpoint().getRemoteCondition());
+ locallyClosed(provider, AmqpSupport.convertToException(getEndpoint(), getEndpoint().getRemoteCondition()));
+ }
+ public void locallyClosed(AmqpProvider provider, Exception error) {
if (parent != null) {
parent.removeChildResource(this);
}
@@ -159,7 +161,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
if (getResourceInfo() instanceof JmsConnectionInfo) {
provider.fireProviderException(error);
} else {
- provider.fireResourceRemotelyClosed(getResourceInfo(), error);
+ provider.fireResourceClosed(getResourceInfo(), error);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 8f5e651..a8ef32d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -112,6 +112,23 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
// the peer sees the update.
stopRequest = request;
receiver.drain(0);
+
+ if (getDrainTimeout() > 0) {
+ // If the remote doesn't respond we will close the consumer and break any
+ // blocked receive or stop calls that are waiting.
+ final ScheduledFuture<?> future = getSession().schedule(new Runnable() {
+ @Override
+ public void run() {
+ LOG.trace("Consumer {} drain request timed out", getConsumerId());
+ IOException error = new IOException("Remote did not respond to a drain request in time");
+ locallyClosed(session.getProvider(), error);
+ stopRequest.onFailure(error);
+ session.getProvider().pumpToProtonTransport(stopRequest);
+ }
+ }, getDrainTimeout());
+
+ stopRequest = new ScheduledRequest(future, stopRequest);
+ }
}
}
@@ -124,14 +141,12 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
LOG.trace("Consumer {} running scheduled stop", getConsumerId());
if (getEndpoint().getRemoteCredit() != 0) {
stop(request);
- // TODO: We close the proton transport head to avoid this doing any writes if
- // the TCP transport has gone, but it might be good to also avoid trying here.
session.getProvider().pumpToProtonTransport(request);
}
}
}, timeout);
- stopRequest = new ScheduledStopRequest(future, request);
+ stopRequest = new ScheduledRequest(future, request);
}
@Override
@@ -488,6 +503,10 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
this.presettle = presettle;
}
+ public int getDrainTimeout() {
+ return session.getProvider().getDrainTimeout();
+ }
+
@Override
public String toString() {
return "AmqpConsumer { " + getResourceInfo().getId() + " }";
@@ -544,26 +563,26 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
//----- Inner classes used in message pull operations --------------------//
- protected static final class ScheduledStopRequest implements AsyncResult {
+ protected static final class ScheduledRequest implements AsyncResult {
- private final ScheduledFuture<?> sheduledStopTask;
+ private final ScheduledFuture<?> sheduledTask;
private final AsyncResult origRequest;
- public ScheduledStopRequest(ScheduledFuture<?> completionTask, AsyncResult origRequest) {
- this.sheduledStopTask = completionTask;
+ public ScheduledRequest(ScheduledFuture<?> completionTask, AsyncResult origRequest) {
+ this.sheduledTask = completionTask;
this.origRequest = origRequest;
}
@Override
- public void onFailure(Throwable t) {
- sheduledStopTask.cancel(false);
- origRequest.onFailure(t);
+ public void onFailure(Throwable cause) {
+ sheduledTask.cancel(false);
+ origRequest.onFailure(cause);
}
@Override
public void onSuccess() {
- boolean cancelled = sheduledStopTask.cancel(false);
- if(cancelled) {
+ boolean cancelled = sheduledTask.cancel(false);
+ if (cancelled) {
// Signal completion. Otherwise wait for the scheduled task to do it.
origRequest.onSuccess();
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 5520ec4..8e25c86 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -16,10 +16,6 @@
*/
package org.apache.qpid.jms.provider.amqp;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufUtil;
-import io.netty.util.ReferenceCountUtil;
-
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
@@ -77,6 +73,10 @@ import org.apache.qpid.proton.framing.TransportFrame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.util.ReferenceCountUtil;
+
/**
* An AMQP v1.0 Provider.
*
@@ -118,6 +118,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
private long closeTimeout = JmsConnectionInfo.DEFAULT_CLOSE_TIMEOUT;
private int channelMax = DEFAULT_CHANNEL_MAX;
private int idleTimeout = 60000;
+ private int drainTimeout = 60000;
private long sessionOutoingWindow = -1; //Use proton default
private int maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
@@ -924,10 +925,10 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
}
}
- void fireResourceRemotelyClosed(JmsResource resource, Exception ex) {
+ void fireResourceClosed(JmsResource resource, Exception ex) {
ProviderListener listener = this.listener;
if (listener != null) {
- listener.onResourceRemotelyClosed(resource, ex);
+ listener.onResourceClosed(resource, ex);
}
}
@@ -1029,6 +1030,22 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
this.idleTimeout = idleTimeout;
}
+ public int getDrainTimeout() {
+ return drainTimeout;
+ }
+
+ /**
+ * Sets the drain timeout (in milliseconds) after which a consumer will be
+ * treated as having failed and will be closed due to unknown state of the
+ * remote having not responded to the requested drain.
+ *
+ * @param drainTimeout
+ * the drainTimeout to use for receiver links.
+ */
+ public void setDrainTimeout(int drainTimeout) {
+ this.drainTimeout = drainTimeout;
+ }
+
public int getMaxFrameSize() {
return maxFrameSize;
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java
index 80de69a..9186eba 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java
@@ -49,14 +49,14 @@ public class JmsDefaultConnectionListener implements JmsConnectionListener {
}
@Override
- public void onSessionRemotelyClosed(Session session, Exception exception) {
+ public void onSessionClosed(Session session, Exception exception) {
}
@Override
- public void onConsumerRemotelyClosed(MessageConsumer consumer, Exception cause) {
+ public void onConsumerClosed(MessageConsumer consumer, Exception cause) {
}
@Override
- public void onProducerRemotelyClosed(MessageProducer producer, Exception cause) {
+ public void onProducerClosed(MessageProducer producer, Exception cause) {
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index 5dc4e37..4ca76b7 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -128,7 +128,7 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
- public void onConsumerRemotelyClosed(MessageConsumer consumer, Exception exception) {
+ public void onConsumerClosed(MessageConsumer consumer, Exception exception) {
consumerClosed.set(true);
}
});
@@ -754,6 +754,67 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
}
}
+ @Test(timeout=30000)
+ public void testReceiveWithTimoutAndNoDrainResponseFailsAfterTimeout() throws IOException, Exception {
+ doDrainWithNoResponseOnNoMessageTestImpl(false);
+ }
+
+ @Test(timeout=30000)
+ public void testReceiveNoWaitAndNoDrainResponseFailsAfterTimeout() throws IOException, Exception {
+ doDrainWithNoResponseOnNoMessageTestImpl(true);
+ }
+
+ private void doDrainWithNoResponseOnNoMessageTestImpl(boolean noWait) throws JMSException, InterruptedException, Exception, IOException {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = null;
+ connection = testFixture.establishConnecton(testPeer, "?amqp.drainTimeout=500");
+
+ connection.start();
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ // Expect receiver link attach and send credit
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
+
+ // Expect drain but do not respond so that the consumer times out.
+ testPeer.expectLinkFlow(true, false, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
+
+ // Consumer should close due to timed waiting for drain.
+ testPeer.expectDetach(true, true, true);
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ try {
+ if (noWait) {
+ consumer.receiveNoWait();
+ } else {
+ consumer.receive(1);
+ }
+
+ fail("Drain timeout should have aborted the receive.");
+ } catch (JMSException ex) {
+ LOG.info("Receive failed after drain timeout as expected: {}", ex.getMessage());
+ }
+
+ try {
+ consumer.getMessageSelector();
+ fail("Should be closed and throw an exception");
+ } catch (JMSException ex) {
+ }
+
+ consumer.close();
+
+ testPeer.expectClose();
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(3000);
+ }
+ }
+
/* Check the clients view of the remaining credit stays in sync with the transports
* even in the face of the remote peer advancing the delivery count unexpectedly,
* ensuring the client doesn't later think there is credit when there is none.
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index 2e49080..485e5d2 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -802,7 +802,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
- public void onProducerRemotelyClosed(MessageProducer producer, Exception exception) {
+ public void onProducerClosed(MessageProducer producer, Exception exception) {
producerClosed.set(true);
}
});
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index cf4c0a6..53a4c43 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -1271,7 +1271,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
- public void onSessionRemotelyClosed(Session session, Exception exception) {
+ public void onSessionClosed(Session session, Exception exception) {
sessionClosed.set(true);
}
});
@@ -1334,7 +1334,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
- public void onSessionRemotelyClosed(Session session, Exception exception) {
+ public void onSessionClosed(Session session, Exception exception) {
sessionClosed.set(true);
}
});
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsConsumerPriorityDispatchTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsConsumerPriorityDispatchTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsConsumerPriorityDispatchTest.java
index 5c15770..e076455 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsConsumerPriorityDispatchTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsConsumerPriorityDispatchTest.java
@@ -77,15 +77,15 @@ public class JmsConsumerPriorityDispatchTest extends AmqpTestSupport {
}
@Override
- public void onSessionRemotelyClosed(Session session, Exception exception) {
+ public void onSessionClosed(Session session, Exception exception) {
}
@Override
- public void onConsumerRemotelyClosed(MessageConsumer consumer, Exception cause) {
+ public void onConsumerClosed(MessageConsumer consumer, Exception cause) {
}
@Override
- public void onProducerRemotelyClosed(MessageProducer producer, Exception cause) {
+ public void onProducerClosed(MessageProducer producer, Exception cause) {
}
});
@@ -145,15 +145,15 @@ public class JmsConsumerPriorityDispatchTest extends AmqpTestSupport {
}
@Override
- public void onSessionRemotelyClosed(Session session, Exception exception) {
+ public void onSessionClosed(Session session, Exception exception) {
}
@Override
- public void onConsumerRemotelyClosed(MessageConsumer consumer, Exception cause) {
+ public void onConsumerClosed(MessageConsumer consumer, Exception cause) {
}
@Override
- public void onProducerRemotelyClosed(MessageProducer producer, Exception cause) {
+ public void onProducerClosed(MessageProducer producer, Exception cause) {
}
});
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/FileWatcherDiscoveryTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/FileWatcherDiscoveryTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/FileWatcherDiscoveryTest.java
index 19d8a0a..e0939fb 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/FileWatcherDiscoveryTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/FileWatcherDiscoveryTest.java
@@ -209,15 +209,15 @@ public class FileWatcherDiscoveryTest extends AmqpTestSupport {
}
@Override
- public void onSessionRemotelyClosed(Session session, Exception exception) {
+ public void onSessionClosed(Session session, Exception exception) {
}
@Override
- public void onConsumerRemotelyClosed(MessageConsumer consumer, Exception cause) {
+ public void onConsumerClosed(MessageConsumer consumer, Exception cause) {
}
@Override
- public void onProducerRemotelyClosed(MessageProducer producer, Exception cause) {
+ public void onProducerClosed(MessageProducer producer, Exception cause) {
}
});
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java
index 24b1a09..dee33f2 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java
@@ -224,14 +224,14 @@ public class JmsAmqpDiscoveryTest extends AmqpTestSupport implements JmsConnecti
}
@Override
- public void onSessionRemotelyClosed(Session session, Exception exception) {
+ public void onSessionClosed(Session session, Exception exception) {
}
@Override
- public void onConsumerRemotelyClosed(MessageConsumer consumer, Exception cause) {
+ public void onConsumerClosed(MessageConsumer consumer, Exception cause) {
}
@Override
- public void onProducerRemotelyClosed(MessageProducer producer, Exception cause) {
+ public void onProducerClosed(MessageProducer producer, Exception cause) {
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
index d536b1e..47c7bfc 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
@@ -360,15 +360,15 @@ public class JmsTransactedConsumerTest extends AmqpTestSupport {
}
@Override
- public void onSessionRemotelyClosed(Session session, Exception exception) {
+ public void onSessionClosed(Session session, Exception exception) {
}
@Override
- public void onConsumerRemotelyClosed(MessageConsumer consumer, Exception cause) {
+ public void onConsumerClosed(MessageConsumer consumer, Exception cause) {
}
@Override
- public void onProducerRemotelyClosed(MessageProducer producer, Exception cause) {
+ public void onProducerClosed(MessageProducer producer, Exception cause) {
}
});
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org