You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2017/01/11 21:25:36 UTC
qpid-jms git commit: QPIDJMS-249 Don't close consumer link that is in
a TX on stop timeout
Repository: qpid-jms
Updated Branches:
refs/heads/master 56d1b6654 -> bb91ce125
QPIDJMS-249 Don't close consumer link that is in a TX on stop timeout
If the consumer is stopped and the request times out, do not close the
consumer link if that consumer is participating in a transaction.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/bb91ce12
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/bb91ce12
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/bb91ce12
Branch: refs/heads/master
Commit: bb91ce125a805b4216e0e55669e1404662e43613
Parents: 56d1b66
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Jan 11 16:25:08 2017 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Jan 11 16:25:22 2017 -0500
----------------------------------------------------------------------
.../apache/qpid/jms/provider/amqp/AmqpConsumer.java | 13 ++++++++++---
.../qpid/jms/provider/amqp/AmqpTransactionContext.java | 8 ++++++++
.../jms/integration/TransactionsIntegrationTest.java | 6 ------
3 files changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bb91ce12/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index fb818f4..9f9cd42 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -120,14 +120,21 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
if (getDrainTimeout() > 0) {
// If the remote doesn't respond we will close the consumer and break any
- // blocked receive or stop calls that are waiting.
+ // blocked receive or stop calls that are waiting, unless the consumer is
+ // a participant in a transaction in which case we will just fail the request
+ // and leave the consumer open since the TX needs it to remain active.
final ScheduledFuture<?> future = getSession().schedule(new Runnable() {
@Override
public void run() {
LOG.trace("Consumer {} drain request timed out", getConsumerId());
Exception cause = new JmsOperationTimedOutException("Remote did not respond to a drain request in time");
- closeResource(session.getProvider(), cause, false);
- session.getProvider().pumpToProtonTransport();
+ if (session.isTransacted() && session.getTransactionContext().isInTransaction(AmqpConsumer.this)) {
+ stopRequest.onFailure(cause);
+ stopRequest = null;
+ } else {
+ closeResource(session.getProvider(), cause, false);
+ session.getProvider().pumpToProtonTransport();
+ }
}
}, getDrainTimeout());
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bb91ce12/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
index 4ea7465..8f3aab8 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
@@ -174,10 +174,18 @@ public class AmqpTransactionContext implements AmqpResourceParent {
txConsumers.add(consumer);
}
+ public boolean isInTransaction(AmqpConsumer consumer) {
+ return txConsumers.contains(consumer);
+ }
+
public void registerTxProducer(AmqpProducer producer) {
txProducers.add(producer);
}
+ public boolean isInTransaction(AmqpProducer producer) {
+ return txProducers.contains(producer);
+ }
+
public AmqpSession getSession() {
return session;
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bb91ce12/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
index 6ae09d5..f9f9beb 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
@@ -1335,16 +1335,10 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
// Expect the consumer to be 'stopped' prior to rollback by issuing a 'drain'
testPeer.expectLinkFlow(true, false, greaterThan(UnsignedInteger.ZERO));
- // Expect the consumer to be closed after drain timeout
- testPeer.expectDetach(true, true, true);
-
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with accepted and settled disposition to indicate the rollback succeeded
testPeer.expectDischarge(txnId, true);
- // Expect the release of the prefetched message from the consumer that was closed.
- //testPeer.expectDispositionThatIsReleasedAndSettled();
-
// Then expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org