You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2017/02/15 23:38:57 UTC

qpid-jms git commit: QPIDJMS-264 Adds a couple tests for order of dispatch in a TX

Repository: qpid-jms
Updated Branches:
  refs/heads/master e343dd054 -> f3b9cd01f


QPIDJMS-264 Adds a couple tests for order of dispatch in a TX

Adds tests that validate if messages are dispatched to the consumer in
order while consuming under a TX.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/f3b9cd01
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/f3b9cd01
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/f3b9cd01

Branch: refs/heads/master
Commit: f3b9cd01fc19efb4e50a8b3a187c88019becee9f
Parents: e343dd0
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Feb 15 18:38:07 2017 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Feb 15 18:38:12 2017 -0500

----------------------------------------------------------------------
 .../TransactionsIntegrationTest.java            | 68 ++++++++++++++++++++
 .../transactions/JmsTransactedConsumerTest.java | 35 +++++++++-
 2 files changed, 102 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f3b9cd01/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
index bd8d997..867d2d9 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
@@ -23,11 +23,14 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.IOException;
+
 import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -44,6 +47,8 @@ import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Declare;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Declared;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Error;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Modified;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected;
@@ -58,17 +63,22 @@ import org.apache.qpid.jms.test.testpeer.matchers.TransactionalStateMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
+import org.apache.qpid.jms.util.QpidJMSTestRunner;
+import org.apache.qpid.jms.util.Repeat;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.DescribedType;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Tests for behavior of Transacted Session operations.
  */
+@RunWith(QpidJMSTestRunner.class)
 public class TransactionsIntegrationTest extends QpidJmsTestCase {
 
     private static final Logger LOG = LoggerFactory.getLogger(TransactionsIntegrationTest.class);
@@ -1372,4 +1382,62 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
+
+    @Test(timeout=30000)
+    @Repeat(repetitions = 1)
+    public void testConsumerMessageOrderOnTransactedSession() throws IOException, Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            final int messageCount = 10;
+
+            testPeer.expectBegin();
+            testPeer.expectCoordinatorAttach();
+
+            // First expect an unsettled 'declare' transfer to the txn coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            Binary txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+            TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
+            declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
+            testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue("myQueue");
+
+            DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+            // Expect the browser enumeration to create a underlying consumer
+            testPeer.expectReceiverAttach();
+            // Expect initial credit to be sent, respond with some messages that are tagged with
+            // a sequence number we can use to determine if order is maintained.
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent,
+                messageCount, false, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)), 1, false, true);
+
+            for (int i = 1; i <= messageCount; i++) {
+                // Then expect an *settled* TransactionalState disposition for each message once received by the consumer
+                TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
+                stateMatcher.withTxnId(equalTo(txnId));
+                stateMatcher.withOutcome(new AcceptedMatcher());
+
+                testPeer.expectDisposition(true, stateMatcher);
+            }
+
+            MessageConsumer consumer = session.createConsumer(queue);
+            for (int i = 0; i < messageCount; ++i) {
+                Message message = consumer.receive(500);
+                assertNotNull(message);
+                assertEquals(i, message.getIntProperty(TestAmqpPeer.MESSAGE_NUMBER));
+            }
+
+            // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+            // and reply with accepted and settled disposition to indicate the rollback succeeded
+            testPeer.expectDischarge(txnId, true);
+            testPeer.expectEnd();
+
+            session.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f3b9cd01/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
index fff403b..cde7656 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
@@ -535,5 +535,38 @@ public class JmsTransactedConsumerTest extends AmqpTestSupport {
 
         connection.close();
     }
-}
 
+    @Test(timeout = 90000)
+    public void testConsumerMessagesInOrder() throws Exception {
+
+        for (int i = 0; i < 5; ++i) {
+
+            connection = createAmqpConnection();
+            connection.start();
+
+            final int MESSAGE_COUNT = 20;
+
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue(name.getMethodName());
+
+            sendToAmqQueue(MESSAGE_COUNT);
+
+            MessageConsumer consumer = session.createConsumer(queue);
+
+            for (int j = 0; j < MESSAGE_COUNT; ++j) {
+                Message message = consumer.receive(5000);
+                assertNotNull(message);
+                assertEquals(j + 1, message.getIntProperty(MESSAGE_NUMBER));
+            }
+
+            session.close();
+
+            QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+            proxy.purge();
+
+            assertEquals(0, proxy.getQueueSize());
+
+            consumer.close();
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org