You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2015/10/22 23:22:10 UTC

qpid-jms git commit: QPIDJMS-125 Add a couple tests for consumer close when in a transacted session.

Repository: qpid-jms
Updated Branches:
  refs/heads/master 430f0d5fb -> e05794a5e


QPIDJMS-125 Add a couple tests for consumer close when in a transacted
session.  

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/e05794a5
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/e05794a5
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/e05794a5

Branch: refs/heads/master
Commit: e05794a5e4651f5dac63346edcef598e1463cc58
Parents: 430f0d5
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Oct 22 17:22:00 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Oct 22 17:22:00 2015 -0400

----------------------------------------------------------------------
 .../TransactionsIntegrationTest.java            | 84 ++++++++++++++++++--
 1 file changed, 76 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e05794a5/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
index dbf6f3f..7aeaedc 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
@@ -24,6 +24,7 @@ import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -38,6 +39,7 @@ import javax.jms.TextMessage;
 import javax.jms.TransactionRolledBackException;
 
 import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsPrefetchPolicy;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted;
@@ -307,15 +309,20 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
 
     @Test(timeout=20000)
     public void testCommitTransactedSessionWithConsumerReceivingAllMessages() throws Exception {
-        doCommitTransactedSessionWithConsumerTestImpl(1, 1);
+        doCommitTransactedSessionWithConsumerTestImpl(1, 1, false);
     }
 
     @Test(timeout=20000)
     public void testCommitTransactedSessionWithConsumerReceivingSomeMessages() throws Exception {
-        doCommitTransactedSessionWithConsumerTestImpl(5, 2);
+        doCommitTransactedSessionWithConsumerTestImpl(5, 2, false);
     }
 
-    private void doCommitTransactedSessionWithConsumerTestImpl(int transferCount, int consumeCount) throws Exception {
+    @Test(timeout=20000)
+    public void testCommitTransactedSessionWithConsumerReceivingAllMessagesAndClose() throws Exception {
+        doCommitTransactedSessionWithConsumerTestImpl(1, 1, true);
+    }
+
+    private void doCommitTransactedSessionWithConsumerTestImpl(int transferCount, int consumeCount, boolean closeConsumer) throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
             Connection connection = testFixture.establishConnecton(testPeer);
             connection.start();
@@ -356,6 +363,55 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             // and reply with accepted and settled disposition to indicate the commit succeeded
             testPeer.expectDischarge(txnId, false);
 
+            // Expect the consumer to close now
+            if (closeConsumer) {
+                testPeer.expectDetach(true, true, true);
+                messageConsumer.close();
+            }
+
+            // Then expect an unsettled 'declare' transfer to the txn coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+            testPeer.expectDeclare(txnId);
+
+            session.commit();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testConsumerWithNoMessageCanCloseBeforeCommit() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+            testPeer.expectCoordinatorAttach();
+
+            // First expect an unsettled 'declare' transfer to the txn coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            Binary txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+            testPeer.expectDeclare(txnId);
+
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue("myQueue");
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlow();
+            testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
+            testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
+            testPeer.expectDetach(true, true, true);
+
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            assertNull(messageConsumer.receiveNoWait());
+
+            messageConsumer.close();
+
+            // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+            // and reply with accepted and settled disposition to indicate the commit succeeded
+            testPeer.expectDischarge(txnId, false);
+
             // Then expect an unsettled 'declare' transfer to the txn coordinator, and
             // reply with a declared disposition state containing the txnId.
             txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
@@ -413,15 +469,20 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
 
     @Test(timeout=20000)
     public void testRollbackTransactedSessionWithConsumerReceivingAllMessages() throws Exception {
-        doRollbackTransactedSessionWithConsumerTestImpl(1, 1);
+        doRollbackTransactedSessionWithConsumerTestImpl(1, 1, false);
+    }
+
+    @Test(timeout=20000)
+    public void testRollbackTransactedSessionWithConsumerReceivingAllMessagesThenCloses() throws Exception {
+        doRollbackTransactedSessionWithConsumerTestImpl(1, 1, true);
     }
 
     @Test(timeout=20000)
     public void testRollbackTransactedSessionWithConsumerReceivingSomeMessages() throws Exception {
-        doRollbackTransactedSessionWithConsumerTestImpl(5, 2);
+        doRollbackTransactedSessionWithConsumerTestImpl(5, 2, false);
     }
 
-    private void doRollbackTransactedSessionWithConsumerTestImpl(int transferCount, int consumeCount) throws Exception {
+    private void doRollbackTransactedSessionWithConsumerTestImpl(int transferCount, int consumeCount, boolean closeConsumer) throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
             Connection connection = testFixture.establishConnecton(testPeer);
             connection.start();
@@ -465,6 +526,11 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             // and reply with accepted and settled disposition to indicate the rollback succeeded
             testPeer.expectDischarge(txnId, true);
 
+            if (closeConsumer) {
+                testPeer.expectDetach(true, true, true);
+                messageConsumer.close();
+            }
+
             // Then expect an unsettled 'declare' transfer to the txn coordinator, and
             // reply with a declared disposition state containing the txnId.
             txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
@@ -476,8 +542,10 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
                 testPeer.expectDisposition(true, new ReleasedMatcher());
             }
 
-            // Expect the consumer to be 'started' again as rollback completes
-            testPeer.expectLinkFlow(false, false, greaterThan(UnsignedInteger.ZERO));
+            if (!closeConsumer) {
+                // Expect the consumer to be 'started' again as rollback completes
+                testPeer.expectLinkFlow(false, false, greaterThan(UnsignedInteger.ZERO));
+            }
 
             session.rollback();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org