You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2015/10/19 16:34:06 UTC

qpid-jms git commit: /QPIDJMS-126 skip sending the disposition if the TX has failed already.

Repository: qpid-jms
Updated Branches:
  refs/heads/master 974a8510b -> c4c79e5c5


/QPIDJMS-126 skip sending the disposition if the TX has failed already.  

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/c4c79e5c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/c4c79e5c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/c4c79e5c

Branch: refs/heads/master
Commit: c4c79e5c549e643fb5bcc03222ffbf955b6b8da1
Parents: 974a851
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Oct 19 10:30:42 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Oct 19 10:30:42 2015 -0400

----------------------------------------------------------------------
 .../qpid/jms/provider/amqp/AmqpConsumer.java    |  6 +++
 .../TransactionsIntegrationTest.java            | 52 +++++++++++++++++++-
 2 files changed, 57 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c4c79e5c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 9d95fa4..877cd1b 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -218,6 +218,12 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
             LOG.debug("Consumed Ack of message: {}", envelope);
             if (!delivery.isSettled()) {
                 if (session.isTransacted() && !getResourceInfo().isBrowser()) {
+
+                    if (session.isTransactionFailed()) {
+                        LOG.trace("Skipping ack of message {} in failed transaction.", envelope);
+                        return;
+                    }
+
                     Binary txnId = session.getTransactionContext().getAmqpTransactionId();
                     if (txnId != null) {
                         TransactionalState txState = new TransactionalState();

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c4c79e5c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
index be73403..dbf6f3f 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
@@ -56,6 +56,7 @@ import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSec
 import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
 import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.DescribedType;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.junit.Test;
@@ -721,7 +722,7 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
         }
     }
 
-    @Test // (timeout=20000)
+    @Test(timeout=20000)
     public void testSendAfterCoordinatorLinkClosedDuringTX() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
             Connection connection = testFixture.establishConnecton(testPeer);
@@ -764,4 +765,53 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
+
+    @Test(timeout=20000)
+    public void testReceiveAfterCoordinatorLinkClosedDuringTX() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+            testPeer.expectCoordinatorAttach();
+
+            // First expect an unsettled 'declare' transfer to the txn coordinator, and
+            // reply with a Declared disposition state containing the txnId.
+            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+            testPeer.expectDeclare(txnId);
+
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue("myQueue");
+
+            // Create a consumer and send it an initial message for receive to process.
+            DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
+
+            // Close the link, the messages should now just get dropped on the floor.
+            testPeer.remotelyCloseLastCoordinatorLink();
+
+            MessageConsumer consumer = session.createConsumer(queue);
+
+            testPeer.waitForAllHandlersToComplete(2000);
+
+            // receiving the message would normally ack it, since the TX is failed this
+            // should not result in a disposition going out.
+            Message received = consumer.receive();
+            assertNotNull(received);
+
+            // Expect that a new link will be created in order to start the next TX.
+            txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+            testPeer.expectCoordinatorAttach();
+            testPeer.expectDeclare(txnId);
+
+            try {
+                session.commit();
+                fail("Commit operation should have failed.");
+            } catch (TransactionRolledBackException jmsTxRb) {
+            }
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org