You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/11/17 16:04:04 UTC

[6/6] qpid-jms git commit: add brokerless integration test of committing a transacted session with a consumer

add brokerless integration test of committing a transacted session with a consumer


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/92112063
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/92112063
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/92112063

Branch: refs/heads/master
Commit: 92112063a48a62032da0811aadf5deb86749572d
Parents: e13dfc4
Author: Robert Gemmell <ro...@apache.org>
Authored: Mon Nov 17 14:17:06 2014 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Mon Nov 17 14:17:06 2014 +0000

----------------------------------------------------------------------
 .../jms/integration/SessionIntegrationTest.java | 63 ++++++++++++++++++++
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    |  1 -
 2 files changed, 63 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/92112063/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 5a4a754..498af7f 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
@@ -31,19 +32,31 @@ import java.io.IOException;
 import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.Message;
+import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TemporaryQueue;
+import javax.jms.TextMessage;
 import javax.jms.Topic;
 import javax.jms.TopicSubscriber;
 
 import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.testpeer.DescriptorMatcher;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Declare;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Declared;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Discharge;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.TargetMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.TransactionalStateMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
+import org.apache.qpid.proton.amqp.Binary;
 import org.junit.Test;
 
 public class SessionIntegrationTest extends QpidJmsTestCase {
@@ -311,4 +324,54 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
+
+    @Test(timeout=5000)
+    public void testCommitTransactedSessionWithConsumer() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin(true);
+            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
+            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue("myQueue");
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"));
+
+            // First expect an unsettled 'declare' transfer to the txn coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            Binary txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+            TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
+            declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
+            testPeer.expectTransfer(declareMatcher, false, new Declared().setTxnId(txnId), true);
+
+            // Then expect a TransactionalState disposition for the applications message once consumed
+            TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
+            stateMatcher.withTxnId(equalTo(txnId));
+            stateMatcher.withOutcome(new DescriptorMatcher(Accepted.DESCRIPTOR_CODE, Accepted.DESCRIPTOR_SYMBOL));
+            testPeer.expectDisposition(false, stateMatcher);
+
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            Message receivedMessage = messageConsumer.receive(1000);
+
+            assertNotNull(receivedMessage);
+            assertTrue(receivedMessage instanceof TextMessage);
+
+            // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+            // and reply with accepted and settled disposition to indicate the commit succeeded
+            Discharge discharge = new Discharge();
+            discharge.setFail(false);
+            discharge.setTxnId(txnId);
+            TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher();
+            dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
+            testPeer.expectTransfer(dischargeMatcher, false, new Accepted(), true);
+
+            session.commit();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/92112063/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index c0f2bc2..adc4fe9 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -71,7 +71,6 @@ import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.UnsignedByte;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.apache.qpid.proton.amqp.UnsignedShort;
-import org.apache.qpid.proton.amqp.transport.DeliveryState;
 import org.apache.qpid.proton.codec.Data;
 import org.apache.qpid.proton.engine.impl.AmqpHeader;
 import org.hamcrest.Matcher;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org