You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/03/23 23:21:17 UTC
qpid-jms git commit: QPIDJMS-157 Add timeout handling to close of
resources like consumer, producer, session,
etc. Fix possible double close call when producer is closed.
Repository: qpid-jms
Updated Branches:
refs/heads/master dd97edb72 -> f8470ec1b
QPIDJMS-157 Add timeout handling to close of resources like consumer,
producer, session, etc. Fix possible double close call when producer is
closed.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/f8470ec1
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/f8470ec1
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/f8470ec1
Branch: refs/heads/master
Commit: f8470ec1bc9f7280233593c5809c443a55f3ac92
Parents: dd97edb
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Mar 23 18:21:09 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Mar 23 18:21:09 2016 -0400
----------------------------------------------------------------------
.../jms/provider/amqp/AmqpAbstractResource.java | 34 ++++++++++++++++++--
.../provider/amqp/AmqpConnectionSession.java | 6 ++--
.../jms/provider/amqp/AmqpFixedProducer.java | 4 +--
.../integration/ConsumerIntegrationTest.java | 30 +++++++++++++++++
.../integration/ProducerIntegrationTest.java | 29 +++++++++++++++++
.../jms/integration/SessionIntegrationTest.java | 26 +++++++++++++++
6 files changed, 122 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f8470ec1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
index a075900..81f8657 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
@@ -17,7 +17,9 @@
package org.apache.qpid.jms.provider.amqp;
import java.io.IOException;
+import java.util.concurrent.ScheduledFuture;
+import org.apache.qpid.jms.JmsOperationTimedOutException;
import org.apache.qpid.jms.meta.JmsConnectionInfo;
import org.apache.qpid.jms.meta.JmsResource;
import org.apache.qpid.jms.provider.AsyncResult;
@@ -41,6 +43,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
private static final Logger LOG = LoggerFactory.getLogger(AmqpAbstractResource.class);
protected AsyncResult closeRequest;
+ protected ScheduledFuture<?> closeTimeoutTask;
private final E endpoint;
private final R resourceInfo;
@@ -97,6 +100,35 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
closeRequest = request;
+ long closeTimeout = getParent().getProvider().getRequestTimeout();
+ if (closeTimeout != JmsConnectionInfo.INFINITE) {
+ closeTimeoutTask = getParent().getProvider().scheduleRequestTimeout(
+ new AsyncResult() {
+
+ @Override
+ public void onSuccess() {
+ // Not called in this context.
+ }
+
+ @Override
+ public void onFailure(Throwable result) {
+ closeRequest.onFailure(result);
+ closeRequest = null;
+
+ // This ensures that the resource gets properly cleaned
+ // up, the request will have already completed so there
+ // won't be multiple events fired.
+ resourceClosed();
+ }
+
+ @Override
+ public boolean isComplete() {
+ return closeRequest != null ? closeRequest.isComplete() : true;
+ }
+
+ }, closeTimeout, new JmsOperationTimedOutException("Timed Out Waiting for close response: " + this));
+ }
+
closeOrDetachEndpoint();
}
@@ -124,8 +156,6 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
endpoint.close();
}
- LOG.info("Resource {} was remotely closed", getResourceInfo());
-
if (getResourceInfo() instanceof JmsConnectionInfo) {
provider.fireProviderException(error);
} else {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f8470ec1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
index 162feb4..04fc489 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
@@ -77,8 +77,8 @@ public class AmqpConnectionSession extends AmqpSession {
private class DurableSubscriptionReattach extends AmqpAbstractResource<JmsSessionInfo, Receiver> {
- public DurableSubscriptionReattach(JmsSessionInfo resource, Receiver receiver) {
- super(resource, receiver);
+ public DurableSubscriptionReattach(JmsSessionInfo resource, Receiver receiver, AmqpResourceParent parent) {
+ super(resource, receiver, parent);
}
public String getSubscriptionName() {
@@ -108,7 +108,7 @@ public class AmqpConnectionSession extends AmqpSession {
@Override
protected DurableSubscriptionReattach createResource(AmqpSession parent, JmsSessionInfo resourceInfo, Receiver endpoint) {
- return new DurableSubscriptionReattach(resourceInfo, endpoint);
+ return new DurableSubscriptionReattach(resourceInfo, endpoint, getProvider());
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f8470ec1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index b582052..f548096 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -194,7 +194,7 @@ public class AmqpFixedProducer extends AmqpProducer {
LOG.trace("Dispatching previously held send");
InFlightSend held = blocked.pop();
try {
- doSend(held.envelope, held); // TODO - Cancel timeout and reset after dispatch ?
+ doSend(held.envelope, held);
} catch (JMSException e) {
throw IOExceptionSupport.create(e);
}
@@ -202,7 +202,7 @@ public class AmqpFixedProducer extends AmqpProducer {
}
// Once the pending sends queue is drained we can propagate the close request.
- if (blocked.isEmpty() && isAwaitingClose()) {
+ if (blocked.isEmpty() && isAwaitingClose() && !isClosed()) {
super.close(closeRequest);
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f8470ec1/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index f7715a9..762f345 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -43,6 +43,7 @@ import javax.jms.Topic;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsDefaultConnectionListener;
+import org.apache.qpid.jms.JmsOperationTimedOutException;
import org.apache.qpid.jms.JmsPrefetchPolicy;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.test.QpidJmsTestCase;
@@ -89,6 +90,35 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
}
@Test(timeout = 20000)
+ public void testCloseConsumerTimesOut() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+ connection.setRequestTimeout(500);
+
+ testPeer.expectBegin();
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlow();
+ testPeer.expectDetach(true, false, true);
+ testPeer.expectClose();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ try {
+ consumer.close();
+ fail("Should have thrown a timed out exception");
+ } catch (JmsOperationTimedOutException jmsEx) {
+ LOG.info("Caught excpected exception", jmsEx);
+ }
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
public void testRemotelyCloseConsumer() throws Exception {
final String BREAD_CRUMB = "ErrorMessage";
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f8470ec1/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index de1e6b3..f65a47a 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -49,6 +49,7 @@ import javax.jms.TextMessage;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.JmsOperationTimedOutException;
import org.apache.qpid.jms.JmsSendTimedOutException;
import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
import org.apache.qpid.jms.test.QpidJmsTestCase;
@@ -99,6 +100,34 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
}
@Test(timeout = 20000)
+ public void testCloseSenderTimesOut() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+ connection.setRequestTimeout(500);
+
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+ testPeer.expectDetach(true, false, true);
+ testPeer.expectClose();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+ MessageProducer producer = session.createProducer(queue);
+
+ try {
+ producer.close();
+ fail("Should have thrown a timed out exception");
+ } catch (JmsOperationTimedOutException jmsEx) {
+ LOG.info("Caught excpected exception", jmsEx);
+ }
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
public void testSentTextMessageCanBeModified() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f8470ec1/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 7275cac..e16fa55 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -112,6 +112,32 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
}
@Test(timeout = 20000)
+ public void testCloseSessionTimesOut() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+ connection.setRequestTimeout(500);
+
+ testPeer.expectBegin();
+ testPeer.expectEnd(false);
+ testPeer.expectClose();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull("Session should not be null", session);
+
+ try {
+ session.close();
+ fail("Should have thrown an timed out exception");
+ } catch (JmsOperationTimedOutException jmsEx) {
+ LOG.info("Caught exception: {}", jmsEx.getMessage());
+ }
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
public void testCreateProducer() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org