You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2017/01/11 21:25:36 UTC

qpid-jms git commit: QPIDJMS-249 Don't close consumer link that is in a TX on stop timeout

Repository: qpid-jms
Updated Branches:
  refs/heads/master 56d1b6654 -> bb91ce125


QPIDJMS-249 Don't close consumer link that is in a TX on stop timeout

If the consumer is stopped and the request times out, do not close the
consumer link if that consumer is participating in a transaction.  



Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/bb91ce12
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/bb91ce12
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/bb91ce12

Branch: refs/heads/master
Commit: bb91ce125a805b4216e0e55669e1404662e43613
Parents: 56d1b66
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Jan 11 16:25:08 2017 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Jan 11 16:25:22 2017 -0500

----------------------------------------------------------------------
 .../apache/qpid/jms/provider/amqp/AmqpConsumer.java    | 13 ++++++++++---
 .../qpid/jms/provider/amqp/AmqpTransactionContext.java |  8 ++++++++
 .../jms/integration/TransactionsIntegrationTest.java   |  6 ------
 3 files changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bb91ce12/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index fb818f4..9f9cd42 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -120,14 +120,21 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
 
             if (getDrainTimeout() > 0) {
                 // If the remote doesn't respond we will close the consumer and break any
-                // blocked receive or stop calls that are waiting.
+                // blocked receive or stop calls that are waiting, unless the consumer is
+                // a participant in a transaction in which case we will just fail the request
+                // and leave the consumer open since the TX needs it to remain active.
                 final ScheduledFuture<?> future = getSession().schedule(new Runnable() {
                     @Override
                     public void run() {
                         LOG.trace("Consumer {} drain request timed out", getConsumerId());
                         Exception cause = new JmsOperationTimedOutException("Remote did not respond to a drain request in time");
-                        closeResource(session.getProvider(), cause, false);
-                        session.getProvider().pumpToProtonTransport();
+                        if (session.isTransacted() && session.getTransactionContext().isInTransaction(AmqpConsumer.this)) {
+                            stopRequest.onFailure(cause);
+                            stopRequest = null;
+                        } else {
+                            closeResource(session.getProvider(), cause, false);
+                            session.getProvider().pumpToProtonTransport();
+                        }
                     }
                 }, getDrainTimeout());
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bb91ce12/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
index 4ea7465..8f3aab8 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
@@ -174,10 +174,18 @@ public class AmqpTransactionContext implements AmqpResourceParent {
         txConsumers.add(consumer);
     }
 
+    public boolean isInTransaction(AmqpConsumer consumer) {
+        return txConsumers.contains(consumer);
+    }
+
     public void registerTxProducer(AmqpProducer producer) {
         txProducers.add(producer);
     }
 
+    public boolean isInTransaction(AmqpProducer producer) {
+        return txProducers.contains(producer);
+    }
+
     public AmqpSession getSession() {
         return session;
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bb91ce12/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
index 6ae09d5..f9f9beb 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
@@ -1335,16 +1335,10 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             // Expect the consumer to be 'stopped' prior to rollback by issuing a 'drain'
             testPeer.expectLinkFlow(true, false, greaterThan(UnsignedInteger.ZERO));
 
-            // Expect the consumer to be closed after drain timeout
-            testPeer.expectDetach(true, true, true);
-
             // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
             // and reply with accepted and settled disposition to indicate the rollback succeeded
             testPeer.expectDischarge(txnId, true);
 
-            // Expect the release of the prefetched message from the consumer that was closed.
-            //testPeer.expectDispositionThatIsReleasedAndSettled();
-
             // Then expect an unsettled 'declare' transfer to the txn coordinator, and
             // reply with a declared disposition state containing the txnId.
             txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org