You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/03/23 23:21:17 UTC

qpid-jms git commit: QPIDJMS-157 Add timeout handling to close of resources like consumer, producer, session, etc. Fix possible double close call when producer is closed.

Repository: qpid-jms
Updated Branches:
  refs/heads/master dd97edb72 -> f8470ec1b


QPIDJMS-157 Add timeout handling to close of resources like consumer,
producer, session, etc.  Fix possible double close call when producer is
closed. 

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/f8470ec1
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/f8470ec1
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/f8470ec1

Branch: refs/heads/master
Commit: f8470ec1bc9f7280233593c5809c443a55f3ac92
Parents: dd97edb
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Mar 23 18:21:09 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Mar 23 18:21:09 2016 -0400

----------------------------------------------------------------------
 .../jms/provider/amqp/AmqpAbstractResource.java | 34 ++++++++++++++++++--
 .../provider/amqp/AmqpConnectionSession.java    |  6 ++--
 .../jms/provider/amqp/AmqpFixedProducer.java    |  4 +--
 .../integration/ConsumerIntegrationTest.java    | 30 +++++++++++++++++
 .../integration/ProducerIntegrationTest.java    | 29 +++++++++++++++++
 .../jms/integration/SessionIntegrationTest.java | 26 +++++++++++++++
 6 files changed, 122 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f8470ec1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
index a075900..81f8657 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
@@ -17,7 +17,9 @@
 package org.apache.qpid.jms.provider.amqp;
 
 import java.io.IOException;
+import java.util.concurrent.ScheduledFuture;
 
+import org.apache.qpid.jms.JmsOperationTimedOutException;
 import org.apache.qpid.jms.meta.JmsConnectionInfo;
 import org.apache.qpid.jms.meta.JmsResource;
 import org.apache.qpid.jms.provider.AsyncResult;
@@ -41,6 +43,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
     private static final Logger LOG = LoggerFactory.getLogger(AmqpAbstractResource.class);
 
     protected AsyncResult closeRequest;
+    protected ScheduledFuture<?> closeTimeoutTask;
 
     private final E endpoint;
     private final R resourceInfo;
@@ -97,6 +100,35 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
 
         closeRequest = request;
 
+        long closeTimeout = getParent().getProvider().getRequestTimeout();
+        if (closeTimeout != JmsConnectionInfo.INFINITE) {
+            closeTimeoutTask = getParent().getProvider().scheduleRequestTimeout(
+                new AsyncResult() {
+
+                    @Override
+                    public void onSuccess() {
+                        // Not called in this context.
+                    }
+
+                    @Override
+                    public void onFailure(Throwable result) {
+                        closeRequest.onFailure(result);
+                        closeRequest = null;
+
+                        // This ensures that the resource gets properly cleaned
+                        // up, the request will have already completed so there
+                        // won't be multiple events fired.
+                        resourceClosed();
+                    }
+
+                    @Override
+                    public boolean isComplete() {
+                        return closeRequest != null ? closeRequest.isComplete() : true;
+                    }
+
+                }, closeTimeout, new JmsOperationTimedOutException("Timed Out Waiting for close response: " + this));
+        }
+
         closeOrDetachEndpoint();
     }
 
@@ -124,8 +156,6 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
             endpoint.close();
         }
 
-        LOG.info("Resource {} was remotely closed", getResourceInfo());
-
         if (getResourceInfo() instanceof JmsConnectionInfo) {
             provider.fireProviderException(error);
         } else {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f8470ec1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
index 162feb4..04fc489 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
@@ -77,8 +77,8 @@ public class AmqpConnectionSession extends AmqpSession {
 
     private class DurableSubscriptionReattach extends AmqpAbstractResource<JmsSessionInfo, Receiver> {
 
-        public DurableSubscriptionReattach(JmsSessionInfo resource, Receiver receiver) {
-            super(resource, receiver);
+        public DurableSubscriptionReattach(JmsSessionInfo resource, Receiver receiver, AmqpResourceParent parent) {
+            super(resource, receiver, parent);
         }
 
         public String getSubscriptionName() {
@@ -108,7 +108,7 @@ public class AmqpConnectionSession extends AmqpSession {
 
         @Override
         protected DurableSubscriptionReattach createResource(AmqpSession parent, JmsSessionInfo resourceInfo, Receiver endpoint) {
-            return new DurableSubscriptionReattach(resourceInfo, endpoint);
+            return new DurableSubscriptionReattach(resourceInfo, endpoint, getProvider());
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f8470ec1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index b582052..f548096 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -194,7 +194,7 @@ public class AmqpFixedProducer extends AmqpProducer {
                 LOG.trace("Dispatching previously held send");
                 InFlightSend held = blocked.pop();
                 try {
-                    doSend(held.envelope, held);  // TODO - Cancel timeout and reset after dispatch ?
+                    doSend(held.envelope, held);
                 } catch (JMSException e) {
                     throw IOExceptionSupport.create(e);
                 }
@@ -202,7 +202,7 @@ public class AmqpFixedProducer extends AmqpProducer {
         }
 
         // Once the pending sends queue is drained we can propagate the close request.
-        if (blocked.isEmpty() && isAwaitingClose()) {
+        if (blocked.isEmpty() && isAwaitingClose() && !isClosed()) {
             super.close(closeRequest);
         }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f8470ec1/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index f7715a9..762f345 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -43,6 +43,7 @@ import javax.jms.Topic;
 
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.JmsDefaultConnectionListener;
+import org.apache.qpid.jms.JmsOperationTimedOutException;
 import org.apache.qpid.jms.JmsPrefetchPolicy;
 import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
@@ -89,6 +90,35 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
     }
 
     @Test(timeout = 20000)
+    public void testCloseConsumerTimesOut() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            connection.setRequestTimeout(500);
+
+            testPeer.expectBegin();
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlow();
+            testPeer.expectDetach(true, false, true);
+            testPeer.expectClose();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+            MessageConsumer consumer = session.createConsumer(queue);
+
+            try {
+                consumer.close();
+                fail("Should have thrown a timed out exception");
+            } catch (JmsOperationTimedOutException jmsEx) {
+                LOG.info("Caught excpected exception", jmsEx);
+            }
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
     public void testRemotelyCloseConsumer() throws Exception {
         final String BREAD_CRUMB = "ErrorMessage";
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f8470ec1/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index de1e6b3..f65a47a 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -49,6 +49,7 @@ import javax.jms.TextMessage;
 
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.JmsOperationTimedOutException;
 import org.apache.qpid.jms.JmsSendTimedOutException;
 import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
@@ -99,6 +100,34 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
     }
 
     @Test(timeout = 20000)
+    public void testCloseSenderTimesOut() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            connection.setRequestTimeout(500);
+
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+            testPeer.expectDetach(true, false, true);
+            testPeer.expectClose();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+            MessageProducer producer = session.createProducer(queue);
+
+            try {
+                producer.close();
+                fail("Should have thrown a timed out exception");
+            } catch (JmsOperationTimedOutException jmsEx) {
+                LOG.info("Caught excpected exception", jmsEx);
+            }
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
     public void testSentTextMessageCanBeModified() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
             Connection connection = testFixture.establishConnecton(testPeer);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f8470ec1/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 7275cac..e16fa55 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -112,6 +112,32 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
     }
 
     @Test(timeout = 20000)
+    public void testCloseSessionTimesOut() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            connection.setRequestTimeout(500);
+
+            testPeer.expectBegin();
+            testPeer.expectEnd(false);
+            testPeer.expectClose();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            assertNotNull("Session should not be null", session);
+
+            try {
+                session.close();
+                fail("Should have thrown an timed out exception");
+            } catch (JmsOperationTimedOutException jmsEx) {
+                LOG.info("Caught exception: {}", jmsEx.getMessage());
+            }
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
     public void testCreateProducer() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
             Connection connection = testFixture.establishConnecton(testPeer);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org