You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/05/06 21:54:10 UTC

qpid-jms git commit: https://issues.apache.org/jira/browse/QPIDJMS-175

Repository: qpid-jms
Updated Branches:
  refs/heads/master 974037eac -> 926fc571d


https://issues.apache.org/jira/browse/QPIDJMS-175

Add amqp.drainTimeout to handle the case where the remote doesn't send
any response for drain request.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/926fc571
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/926fc571
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/926fc571

Branch: refs/heads/master
Commit: 926fc571da5023137bb42035d84139b8da3a981d
Parents: 974037e
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri May 6 17:54:00 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri May 6 17:54:00 2016 -0400

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsConnection.java | 14 ++---
 .../apache/qpid/jms/JmsConnectionListener.java  | 24 ++++----
 .../org/apache/qpid/jms/JmsMessageConsumer.java |  2 +-
 .../java/org/apache/qpid/jms/JmsSession.java    | 16 ++---
 .../jms/provider/DefaultProviderListener.java   |  2 +-
 .../qpid/jms/provider/ProviderListener.java     | 10 ++--
 .../qpid/jms/provider/ProviderWrapper.java      |  4 +-
 .../jms/provider/amqp/AmqpAbstractResource.java |  6 +-
 .../qpid/jms/provider/amqp/AmqpConsumer.java    | 43 +++++++++----
 .../qpid/jms/provider/amqp/AmqpProvider.java    | 29 +++++++--
 .../qpid/jms/JmsDefaultConnectionListener.java  |  6 +-
 .../integration/ConsumerIntegrationTest.java    | 63 +++++++++++++++++++-
 .../integration/ProducerIntegrationTest.java    |  2 +-
 .../jms/integration/SessionIntegrationTest.java |  4 +-
 .../JmsConsumerPriorityDispatchTest.java        | 12 ++--
 .../jms/discovery/FileWatcherDiscoveryTest.java |  6 +-
 .../jms/discovery/JmsAmqpDiscoveryTest.java     |  6 +-
 .../transactions/JmsTransactedConsumerTest.java |  6 +-
 18 files changed, 178 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 4374954..ab9fb7a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -1194,7 +1194,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
     }
 
     @Override
-    public void onResourceRemotelyClosed(final JmsResource resource, final Exception cause) {
+    public void onResourceClosed(final JmsResource resource, final Exception cause) {
         // Closure of the Connection itself is notified via onConnectionFailure
 
         // Run on the connection executor to free the provider to go do more work and avoid
@@ -1207,19 +1207,19 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
                     if (resource instanceof JmsSessionInfo) {
                         JmsSession session = sessions.get(resource.getId());
                         if (session != null) {
-                            session.remotelyClosed(cause);
+                            session.sessionClosed(cause);
                             for (JmsConnectionListener listener : connectionListeners) {
-                                listener.onSessionRemotelyClosed(session, cause);
+                                listener.onSessionClosed(session, cause);
                             }
                         }
                     } else if (resource instanceof JmsProducerInfo) {
                         JmsSessionId parentId = ((JmsProducerInfo) resource).getParentId();
                         JmsSession session = sessions.get(parentId);
                         if (session != null) {
-                            JmsMessageProducer producer = session.producerRemotelyClosed((JmsProducerInfo) resource, cause);
+                            JmsMessageProducer producer = session.producerClosed((JmsProducerInfo) resource, cause);
                             if (producer != null) {
                                 for (JmsConnectionListener listener : connectionListeners) {
-                                    listener.onProducerRemotelyClosed(producer, cause);
+                                    listener.onProducerClosed(producer, cause);
                                 }
                             }
                         }
@@ -1227,10 +1227,10 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
                         JmsSessionId parentId = ((JmsConsumerInfo) resource).getParentId();
                         JmsSession session = sessions.get(parentId);
                         if (session != null) {
-                            JmsMessageConsumer consumer = session.consumerRemotelyClosed((JmsConsumerInfo) resource, cause);
+                            JmsMessageConsumer consumer = session.consumerClosed((JmsConsumerInfo) resource, cause);
                             if (consumer != null) {
                                 for (JmsConnectionListener listener : connectionListeners) {
-                                    listener.onConsumerRemotelyClosed(consumer, cause);
+                                    listener.onConsumerClosed(consumer, cause);
                                 }
                             }
                         }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java
index f06f343..7aecc44 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java
@@ -75,33 +75,33 @@ public interface JmsConnectionListener {
     void onInboundMessage(JmsInboundMessageDispatch envelope);
 
     /**
-     * Called when the remote peer closes a session.
+     * Called when the session is closed due to remote action or local error detection.
      *
      * @param session
-     *      The session that was closed on the remote end.
+     *      The session that was closed and needs to be cleaned up.
      * @param cause
-     *      The exception that provides additional context on the remote closure.
+     *      The exception that provides additional context on the closure.
      */
-    void onSessionRemotelyClosed(Session session, Exception cause);
+    void onSessionClosed(Session session, Exception cause);
 
     /**
-     * Called when the remote peer closes a MessageConsumer.
+     * Called when the MessageConsumer is closed due to remote action or local error detection.
      *
      * @param consumer
-     *      The consumer that was closed on the remote end.
+     *      The consumer that was closed and needs to be cleaned up.
      * @param cause
-     *      The exception that provides additional context on the remote closure.
+     *      The exception that provides additional context on the closure.
      */
-    void onConsumerRemotelyClosed(MessageConsumer consumer, Exception cause);
+    void onConsumerClosed(MessageConsumer consumer, Exception cause);
 
     /**
-     * Called when the remote peer closes a MessageProducer.
+     * Called when the MessageProducer is closed due to remote action or local error detection.
      *
      * @param producer
-     *      The producer that was closed on the remote end.
+     *      The producer that was closed and needs to be cleaned up.
      * @param cause
-     *      The exception that provides additional context on the remote closure.
+     *      The exception that provides additional context on the closure.
      */
-    void onProducerRemotelyClosed(MessageProducer producer, Exception cause);
+    void onProducerClosed(MessageProducer producer, Exception cause);
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index 46f19ca..d63af7c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -257,7 +257,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
                         // is redundant: zero-prefetch consumers already pull, and
                         // the rest block indefinitely on the local messageQueue.
                         pullForced = true;
-                        if(performPullIfRequired(timeout, true)) {
+                        if (performPullIfRequired(timeout, true)) {
                             startConsumerResource();
                             // We refresh credit if it is a prefetching consumer, since the
                             // pull drained it. Processing acks can open the credit window, but

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 64c514e..3569fc7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -268,16 +268,16 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         }
     }
 
-    void remotelyClosed(Exception cause) {
+    void sessionClosed(Exception cause) {
         try {
             shutdown(cause);
         } catch (Throwable error) {
-            LOG.trace("Ignoring exception thrown during cleanup of remotely closed session", error);
+            LOG.trace("Ignoring exception thrown during cleanup of closed session", error);
         }
     }
 
-    JmsMessageConsumer consumerRemotelyClosed(JmsConsumerInfo resource, Exception cause) {
-        LOG.info("A JMS MessageConsumer has been remotely closed: {}", resource);
+    JmsMessageConsumer consumerClosed(JmsConsumerInfo resource, Exception cause) {
+        LOG.info("A JMS MessageConsumer has been closed: {}", resource);
 
         JmsMessageConsumer consumer = consumers.get(resource.getId());
 
@@ -286,14 +286,14 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
                 consumer.shutdown(cause);
             }
         } catch (Throwable error) {
-            LOG.trace("Ignoring exception thrown during cleanup of remotely closed consumer", error);
+            LOG.trace("Ignoring exception thrown during cleanup of closed consumer", error);
         }
 
         return consumer;
     }
 
-    JmsMessageProducer producerRemotelyClosed(JmsProducerInfo resource, Exception cause) {
-        LOG.info("A JMS MessageProducer has been remotely closed: {}", resource);
+    JmsMessageProducer producerClosed(JmsProducerInfo resource, Exception cause) {
+        LOG.info("A JMS MessageProducer has been closed: {}", resource);
 
         JmsMessageProducer producer = producers.get(resource.getId());
 
@@ -302,7 +302,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
                 producer.shutdown(cause);
             }
         } catch (Throwable error) {
-            LOG.trace("Ignoring exception thrown during cleanup of remotely closed producer", error);
+            LOG.trace("Ignoring exception thrown during cleanup of closed producer", error);
         }
 
         return producer;

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java
index b8ad9d2..22e204e 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java
@@ -56,7 +56,7 @@ public class DefaultProviderListener implements ProviderListener {
     }
 
     @Override
-    public void onResourceRemotelyClosed(JmsResource resource, Exception cause) {
+    public void onResourceClosed(JmsResource resource, Exception cause) {
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java
index ebb653d..5c758ed 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java
@@ -112,15 +112,17 @@ public interface ProviderListener {
     void onConnectionFailure(IOException ex);
 
     /**
-     * Called to indicate that a currently active resource has been closed on the
-     * remote end due to management or other action.
+     * Called to indicate that a currently active resource has been closed
+     * due to some error condition, management request or some other action.
+     * This can either be initiated remotely or locally depending on the
+     * condition that triggers the close.
      *
      * @param resource
-     *        the JmsResource instance that has been remotely closed.
+     *        the JmsResource instance that has been closed.
      * @param cause
      *        optional exception object that indicates the cause of the close.
      */
-    void onResourceRemotelyClosed(JmsResource resource, Exception cause);
+    void onResourceClosed(JmsResource resource, Exception cause);
 
     /**
      * Called to indicate that a some client operation caused or received an

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
index e5c5799..8574e04 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
@@ -184,8 +184,8 @@ public class ProviderWrapper<E extends Provider> implements Provider, ProviderLi
     }
 
     @Override
-    public void onResourceRemotelyClosed(JmsResource resource, Exception cause) {
-        listener.onResourceRemotelyClosed(resource, cause);
+    public void onResourceClosed(JmsResource resource, Exception cause) {
+        listener.onResourceClosed(resource, cause);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
index 81f8657..7476d9d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
@@ -144,8 +144,10 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
     }
 
     public void remotelyClosed(AmqpProvider provider) {
-        Exception error = AmqpSupport.convertToException(getEndpoint(), getEndpoint().getRemoteCondition());
+        locallyClosed(provider, AmqpSupport.convertToException(getEndpoint(), getEndpoint().getRemoteCondition()));
+    }
 
+    public void locallyClosed(AmqpProvider provider, Exception error) {
         if (parent != null) {
             parent.removeChildResource(this);
         }
@@ -159,7 +161,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
         if (getResourceInfo() instanceof JmsConnectionInfo) {
             provider.fireProviderException(error);
         } else {
-            provider.fireResourceRemotelyClosed(getResourceInfo(), error);
+            provider.fireResourceClosed(getResourceInfo(), error);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 8f5e651..a8ef32d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -112,6 +112,23 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
             // the peer sees the update.
             stopRequest = request;
             receiver.drain(0);
+
+            if (getDrainTimeout() > 0) {
+                // If the remote doesn't respond we will close the consumer and break any
+                // blocked receive or stop calls that are waiting.
+                final ScheduledFuture<?> future = getSession().schedule(new Runnable() {
+                    @Override
+                    public void run() {
+                        LOG.trace("Consumer {} drain request timed out", getConsumerId());
+                        IOException error = new IOException("Remote did not respond to a drain request in time");
+                        locallyClosed(session.getProvider(), error);
+                        stopRequest.onFailure(error);
+                        session.getProvider().pumpToProtonTransport(stopRequest);
+                    }
+                }, getDrainTimeout());
+
+                stopRequest = new ScheduledRequest(future, stopRequest);
+            }
         }
     }
 
@@ -124,14 +141,12 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
                 LOG.trace("Consumer {} running scheduled stop", getConsumerId());
                 if (getEndpoint().getRemoteCredit() != 0) {
                     stop(request);
-                    // TODO: We close the proton transport head to avoid this doing any writes if
-                    // the TCP transport has gone, but it might be good to also avoid trying here.
                     session.getProvider().pumpToProtonTransport(request);
                 }
             }
         }, timeout);
 
-        stopRequest = new ScheduledStopRequest(future, request);
+        stopRequest = new ScheduledRequest(future, request);
     }
 
     @Override
@@ -488,6 +503,10 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
         this.presettle = presettle;
     }
 
+    public int getDrainTimeout() {
+        return session.getProvider().getDrainTimeout();
+    }
+
     @Override
     public String toString() {
         return "AmqpConsumer { " + getResourceInfo().getId() + " }";
@@ -544,26 +563,26 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
 
     //----- Inner classes used in message pull operations --------------------//
 
-    protected static final class ScheduledStopRequest implements AsyncResult {
+    protected static final class ScheduledRequest implements AsyncResult {
 
-        private final ScheduledFuture<?> sheduledStopTask;
+        private final ScheduledFuture<?> sheduledTask;
         private final AsyncResult origRequest;
 
-        public ScheduledStopRequest(ScheduledFuture<?> completionTask, AsyncResult origRequest) {
-            this.sheduledStopTask = completionTask;
+        public ScheduledRequest(ScheduledFuture<?> completionTask, AsyncResult origRequest) {
+            this.sheduledTask = completionTask;
             this.origRequest = origRequest;
         }
 
         @Override
-        public void onFailure(Throwable t) {
-            sheduledStopTask.cancel(false);
-            origRequest.onFailure(t);
+        public void onFailure(Throwable cause) {
+            sheduledTask.cancel(false);
+            origRequest.onFailure(cause);
         }
 
         @Override
         public void onSuccess() {
-            boolean cancelled = sheduledStopTask.cancel(false);
-            if(cancelled) {
+            boolean cancelled = sheduledTask.cancel(false);
+            if (cancelled) {
                 // Signal completion. Otherwise wait for the scheduled task to do it.
                 origRequest.onSuccess();
             }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 5520ec4..8e25c86 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -16,10 +16,6 @@
  */
 package org.apache.qpid.jms.provider.amqp;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufUtil;
-import io.netty.util.ReferenceCountUtil;
-
 import java.io.IOException;
 import java.net.URI;
 import java.nio.ByteBuffer;
@@ -77,6 +73,10 @@ import org.apache.qpid.proton.framing.TransportFrame;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.util.ReferenceCountUtil;
+
 /**
  * An AMQP v1.0 Provider.
  *
@@ -118,6 +118,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
     private long closeTimeout = JmsConnectionInfo.DEFAULT_CLOSE_TIMEOUT;
     private int channelMax = DEFAULT_CHANNEL_MAX;
     private int idleTimeout = 60000;
+    private int drainTimeout = 60000;
     private long sessionOutoingWindow = -1; //Use proton default
     private int maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
 
@@ -924,10 +925,10 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
         }
     }
 
-    void fireResourceRemotelyClosed(JmsResource resource, Exception ex) {
+    void fireResourceClosed(JmsResource resource, Exception ex) {
         ProviderListener listener = this.listener;
         if (listener != null) {
-            listener.onResourceRemotelyClosed(resource, ex);
+            listener.onResourceClosed(resource, ex);
         }
     }
 
@@ -1029,6 +1030,22 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
         this.idleTimeout = idleTimeout;
     }
 
+    public int getDrainTimeout() {
+        return drainTimeout;
+    }
+
+    /**
+     * Sets the drain timeout (in milliseconds) after which a consumer will be
+     * treated as having failed and will be closed due to unknown state of the
+     * remote having not responded to the requested drain.
+     *
+     * @param drainTimeout
+     *      the drainTimeout to use for receiver links.
+     */
+    public void setDrainTimeout(int drainTimeout) {
+        this.drainTimeout = drainTimeout;
+    }
+
     public int getMaxFrameSize() {
         return maxFrameSize;
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java
index 80de69a..9186eba 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java
@@ -49,14 +49,14 @@ public class JmsDefaultConnectionListener implements JmsConnectionListener {
     }
 
     @Override
-    public void onSessionRemotelyClosed(Session session, Exception exception) {
+    public void onSessionClosed(Session session, Exception exception) {
     }
 
     @Override
-    public void onConsumerRemotelyClosed(MessageConsumer consumer, Exception cause) {
+    public void onConsumerClosed(MessageConsumer consumer, Exception cause) {
     }
 
     @Override
-    public void onProducerRemotelyClosed(MessageProducer producer, Exception cause) {
+    public void onProducerClosed(MessageProducer producer, Exception cause) {
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index 5dc4e37..4ca76b7 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -128,7 +128,7 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
             JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
             connection.addConnectionListener(new JmsDefaultConnectionListener() {
                 @Override
-                public void onConsumerRemotelyClosed(MessageConsumer consumer, Exception exception) {
+                public void onConsumerClosed(MessageConsumer consumer, Exception exception) {
                     consumerClosed.set(true);
                 }
             });
@@ -754,6 +754,67 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
         }
     }
 
+    @Test(timeout=30000)
+    public void testReceiveWithTimoutAndNoDrainResponseFailsAfterTimeout() throws IOException, Exception {
+        doDrainWithNoResponseOnNoMessageTestImpl(false);
+    }
+
+    @Test(timeout=30000)
+    public void testReceiveNoWaitAndNoDrainResponseFailsAfterTimeout() throws IOException, Exception {
+        doDrainWithNoResponseOnNoMessageTestImpl(true);
+    }
+
+    private void doDrainWithNoResponseOnNoMessageTestImpl(boolean noWait) throws JMSException, InterruptedException, Exception, IOException {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = null;
+            connection = testFixture.establishConnecton(testPeer, "?amqp.drainTimeout=500");
+
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            // Expect receiver link attach and send credit
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
+
+            // Expect drain but do not respond so that the consumer times out.
+            testPeer.expectLinkFlow(true, false, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
+
+            // Consumer should close due to timed waiting for drain.
+            testPeer.expectDetach(true, true, true);
+
+            MessageConsumer consumer = session.createConsumer(queue);
+
+            try {
+                if (noWait) {
+                    consumer.receiveNoWait();
+                } else {
+                    consumer.receive(1);
+                }
+
+                fail("Drain timeout should have aborted the receive.");
+            } catch (JMSException ex) {
+                LOG.info("Receive failed after drain timeout as expected: {}", ex.getMessage());
+            }
+
+            try {
+                consumer.getMessageSelector();
+                fail("Should be closed and throw an exception");
+            } catch (JMSException ex) {
+            }
+
+            consumer.close();
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
+
     /* Check the clients view of the remaining credit stays in sync with the transports
      * even in the face of the remote peer advancing the delivery count unexpectedly,
      * ensuring the client doesn't later think there is credit when there is none.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index 2e49080..485e5d2 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -802,7 +802,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
             connection.addConnectionListener(new JmsDefaultConnectionListener() {
                 @Override
-                public void onProducerRemotelyClosed(MessageProducer producer, Exception exception) {
+                public void onProducerClosed(MessageProducer producer, Exception exception) {
                     producerClosed.set(true);
                 }
             });

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index cf4c0a6..53a4c43 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -1271,7 +1271,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
             connection.addConnectionListener(new JmsDefaultConnectionListener() {
                 @Override
-                public void onSessionRemotelyClosed(Session session, Exception exception) {
+                public void onSessionClosed(Session session, Exception exception) {
                     sessionClosed.set(true);
                 }
             });
@@ -1334,7 +1334,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
             connection.addConnectionListener(new JmsDefaultConnectionListener() {
                 @Override
-                public void onSessionRemotelyClosed(Session session, Exception exception) {
+                public void onSessionClosed(Session session, Exception exception) {
                     sessionClosed.set(true);
                 }
             });

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsConsumerPriorityDispatchTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsConsumerPriorityDispatchTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsConsumerPriorityDispatchTest.java
index 5c15770..e076455 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsConsumerPriorityDispatchTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsConsumerPriorityDispatchTest.java
@@ -77,15 +77,15 @@ public class JmsConsumerPriorityDispatchTest extends AmqpTestSupport {
             }
 
             @Override
-            public void onSessionRemotelyClosed(Session session, Exception exception) {
+            public void onSessionClosed(Session session, Exception exception) {
             }
 
             @Override
-            public void onConsumerRemotelyClosed(MessageConsumer consumer, Exception cause) {
+            public void onConsumerClosed(MessageConsumer consumer, Exception cause) {
             }
 
             @Override
-            public void onProducerRemotelyClosed(MessageProducer producer, Exception cause) {
+            public void onProducerClosed(MessageProducer producer, Exception cause) {
             }
         });
 
@@ -145,15 +145,15 @@ public class JmsConsumerPriorityDispatchTest extends AmqpTestSupport {
             }
 
             @Override
-            public void onSessionRemotelyClosed(Session session, Exception exception) {
+            public void onSessionClosed(Session session, Exception exception) {
             }
 
             @Override
-            public void onConsumerRemotelyClosed(MessageConsumer consumer, Exception cause) {
+            public void onConsumerClosed(MessageConsumer consumer, Exception cause) {
             }
 
             @Override
-            public void onProducerRemotelyClosed(MessageProducer producer, Exception cause) {
+            public void onProducerClosed(MessageProducer producer, Exception cause) {
             }
         });
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/FileWatcherDiscoveryTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/FileWatcherDiscoveryTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/FileWatcherDiscoveryTest.java
index 19d8a0a..e0939fb 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/FileWatcherDiscoveryTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/FileWatcherDiscoveryTest.java
@@ -209,15 +209,15 @@ public class FileWatcherDiscoveryTest extends AmqpTestSupport {
             }
 
             @Override
-            public void onSessionRemotelyClosed(Session session, Exception exception) {
+            public void onSessionClosed(Session session, Exception exception) {
             }
 
             @Override
-            public void onConsumerRemotelyClosed(MessageConsumer consumer, Exception cause) {
+            public void onConsumerClosed(MessageConsumer consumer, Exception cause) {
             }
 
             @Override
-            public void onProducerRemotelyClosed(MessageProducer producer, Exception cause) {
+            public void onProducerClosed(MessageProducer producer, Exception cause) {
             }
         });
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java
index 24b1a09..dee33f2 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java
@@ -224,14 +224,14 @@ public class JmsAmqpDiscoveryTest extends AmqpTestSupport implements JmsConnecti
     }
 
     @Override
-    public void onSessionRemotelyClosed(Session session, Exception exception) {
+    public void onSessionClosed(Session session, Exception exception) {
     }
 
     @Override
-    public void onConsumerRemotelyClosed(MessageConsumer consumer, Exception cause) {
+    public void onConsumerClosed(MessageConsumer consumer, Exception cause) {
     }
 
     @Override
-    public void onProducerRemotelyClosed(MessageProducer producer, Exception cause) {
+    public void onProducerClosed(MessageProducer producer, Exception cause) {
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
index d536b1e..47c7bfc 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
@@ -360,15 +360,15 @@ public class JmsTransactedConsumerTest extends AmqpTestSupport {
             }
 
             @Override
-            public void onSessionRemotelyClosed(Session session, Exception exception) {
+            public void onSessionClosed(Session session, Exception exception) {
             }
 
             @Override
-            public void onConsumerRemotelyClosed(MessageConsumer consumer, Exception cause) {
+            public void onConsumerClosed(MessageConsumer consumer, Exception cause) {
             }
 
             @Override
-            public void onProducerRemotelyClosed(MessageProducer producer, Exception cause) {
+            public void onProducerClosed(MessageProducer producer, Exception cause) {
             }
         });
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org