You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2015/10/13 23:07:08 UTC
qpid-jms git commit: QPIDJMS-125 Fix an potential NPE when a TX
Discharge fails with rejected that has a null error. Fix issue with
auto-starting new TX after commit or rollback fails. Adds some tests to cover
the above plus moves TX tests into their o
Repository: qpid-jms
Updated Branches:
refs/heads/master f5b8f9fc2 -> 0fd981ca1
QPIDJMS-125 Fix an potential NPE when a TX Discharge fails with rejected
that has a null error. Fix issue with auto-starting new TX after commit
or rollback fails. Adds some tests to cover the above plus moves TX
tests into their own JUnit test.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/0fd981ca
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/0fd981ca
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/0fd981ca
Branch: refs/heads/master
Commit: 0fd981ca1397607362c0c89920017fded813cec1
Parents: f5b8f9f
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Oct 13 17:06:59 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Oct 13 17:06:59 2015 -0400
----------------------------------------------------------------------
.../qpid/jms/JmsLocalTransactionContext.java | 2 +
.../provider/amqp/AmqpTransactionContext.java | 4 +-
.../jms/integration/SessionIntegrationTest.java | 412 -----------
.../TransactionsIntegrationTest.java | 720 +++++++++++++++++++
4 files changed, 724 insertions(+), 414 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0fd981ca/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
index daee929..7d60fa7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
@@ -183,6 +183,7 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
@Override
public void onPendingFailure(Throwable cause) {
+ reset();
}
});
@@ -235,6 +236,7 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
@Override
public void onPendingFailure(Throwable cause) {
+ reset();
}
});
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0fd981ca/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
index a5e9284..648b896 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
@@ -98,8 +98,8 @@ public class AmqpTransactionContext extends AmqpAbstractResource<JmsSessionInfo,
LOG.debug("Last TX request failed: {}", current.getProviderHint());
pendingDelivery.settle();
Rejected rejected = (Rejected) state;
- TransactionRolledBackException ex =
- new TransactionRolledBackException(rejected.getError().getDescription());
+ Exception cause = AmqpSupport.convertToException(rejected.getError());
+ TransactionRolledBackException ex = new TransactionRolledBackException(cause.getMessage());
AsyncResult request = this.pendingRequest;
this.current = null;
this.pendingRequest = null;
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0fd981ca/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 4daec4b..9fc563f 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -21,7 +21,6 @@ package org.apache.qpid.jms.integration;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.ANONYMOUS_RELAY;
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
@@ -66,17 +65,12 @@ import org.apache.qpid.jms.test.testpeer.basictypes.TerminusDurability;
import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted;
import org.apache.qpid.jms.test.testpeer.describedtypes.Declare;
import org.apache.qpid.jms.test.testpeer.describedtypes.Declared;
-import org.apache.qpid.jms.test.testpeer.describedtypes.Discharge;
-import org.apache.qpid.jms.test.testpeer.describedtypes.Modified;
import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected;
-import org.apache.qpid.jms.test.testpeer.describedtypes.Released;
-import org.apache.qpid.jms.test.testpeer.describedtypes.TransactionalState;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.HeaderDescribedType;
import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher;
-import org.apache.qpid.jms.test.testpeer.matchers.ReleasedMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.SourceMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.TargetMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.TransactionalStateMatcher;
@@ -952,78 +946,6 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
}
@Test(timeout=20000)
- public void testCommitTransactedSessionWithConsumerReceivingAllMessages() throws Exception {
- doCommitTransactedSessionWithConsumerTestImpl(1, 1);
- }
-
- @Test(timeout=20000)
- public void testCommitTransactedSessionWithConsumerReceivingSomeMessages() throws Exception {
- doCommitTransactedSessionWithConsumerTestImpl(5, 2);
- }
-
- private void doCommitTransactedSessionWithConsumerTestImpl(int transferCount, int consumeCount) throws Exception {
- try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
- Connection connection = testFixture.establishConnecton(testPeer);
- connection.start();
-
- testPeer.expectBegin();
- CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
- testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
-
- // First expect an unsettled 'declare' transfer to the txn coordinator, and
- // reply with a declared disposition state containing the txnId.
- Binary txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
- TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
- declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
- testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
-
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- Queue queue = session.createQueue("myQueue");
-
- testPeer.expectReceiverAttach();
- testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), transferCount);
-
- for (int i = 1; i <= consumeCount; i++) {
- // Then expect an *settled* TransactionalState disposition for each message once received by the consumer
- TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
- stateMatcher.withTxnId(equalTo(txnId));
- stateMatcher.withOutcome(new AcceptedMatcher());
-
- testPeer.expectDisposition(true, stateMatcher);
- }
-
- MessageConsumer messageConsumer = session.createConsumer(queue);
-
- for (int i = 1; i <= consumeCount; i++) {
- Message receivedMessage = messageConsumer.receive(3000);
-
- assertNotNull(receivedMessage);
- assertTrue(receivedMessage instanceof TextMessage);
- }
-
- // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
- // and reply with accepted and settled disposition to indicate the commit succeeded
- Discharge discharge = new Discharge();
- discharge.setFail(false);
- discharge.setTxnId(txnId);
- TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher();
- dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
- testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Accepted(), true);
-
- // Then expect an unsettled 'declare' transfer to the txn coordinator, and
- // reply with a declared disposition state containing the txnId.
- txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
- declareMatcher = new TransferPayloadCompositeMatcher();
- declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
- testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
-
- session.commit();
-
- testPeer.waitForAllHandlersToComplete(1000);
- }
- }
-
- @Test(timeout=20000)
public void testIncomingMessageExceedsMaxRedeliveries() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final int COUNT = 5;
@@ -1070,340 +992,6 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
}
}
- @Test(timeout=20000)
- public void testProducedMessagesOnTransactedSessionCarryTxnId() throws Exception {
- try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
- Connection connection = testFixture.establishConnecton(testPeer);
- connection.start();
-
- testPeer.expectBegin();
- CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
- testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
-
- // First expect an unsettled 'declare' transfer to the txn coordinator, and
- // reply with a Declared disposition state containing the txnId.
- Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
- TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
- declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
- testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
-
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- Queue queue = session.createQueue("myQueue");
-
- // Create a producer to use in provoking creation of the AMQP transaction
- testPeer.expectSenderAttach();
- MessageProducer producer = session.createProducer(queue);
-
- // Expect the message which provoked creating the transaction. Check it carries
- // TransactionalState with the above txnId but has no outcome. Respond with a
- // TransactionalState with Accepted outcome.
- TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
- messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
- messageMatcher.setMessageAnnotationsMatcher( new MessageAnnotationsSectionMatcher(true));
-
- TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
- stateMatcher.withTxnId(equalTo(txnId));
- stateMatcher.withOutcome(nullValue());
-
- TransactionalState txState = new TransactionalState();
- txState.setTxnId(txnId);
- txState.setOutcome(new Accepted());
-
- testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true);
-
- producer.send(session.createMessage());
-
- testPeer.waitForAllHandlersToComplete(1000);
- }
- }
-
- @Test(timeout=20000)
- public void testRollbackTransactedSessionWithConsumerReceivingAllMessages() throws Exception {
- doRollbackTransactedSessionWithConsumerTestImpl(1, 1);
- }
-
- @Test(timeout=20000)
- public void testRollbackTransactedSessionWithConsumerReceivingSomeMessages() throws Exception {
- doRollbackTransactedSessionWithConsumerTestImpl(5, 2);
- }
-
- private void doRollbackTransactedSessionWithConsumerTestImpl(int transferCount, int consumeCount) throws Exception {
- try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
- Connection connection = testFixture.establishConnecton(testPeer);
- connection.start();
-
- testPeer.expectBegin();
- CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
- testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
-
- // First expect an unsettled 'declare' transfer to the txn coordinator, and
- // reply with a declared disposition state containing the txnId.
- Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
- TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
- declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
- testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
-
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- Queue queue = session.createQueue("myQueue");
-
- testPeer.expectReceiverAttach();
- testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), transferCount);
-
- for (int i = 1; i <= consumeCount; i++) {
- // Then expect a *settled* TransactionalState disposition for each message once received by the consumer
- TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
- stateMatcher.withTxnId(equalTo(txnId));
- stateMatcher.withOutcome(new AcceptedMatcher());
-
- testPeer.expectDisposition(true, stateMatcher);
- }
-
- MessageConsumer messageConsumer = session.createConsumer(queue);
-
- for (int i = 1; i <= consumeCount; i++) {
- Message receivedMessage = messageConsumer.receive(3000);
-
- assertNotNull(receivedMessage);
- assertTrue(receivedMessage instanceof TextMessage);
- }
-
- // Expect the consumer to be 'stopped' prior to rollback by issuing a 'drain'
- testPeer.expectLinkFlow(true, true, greaterThan(UnsignedInteger.ZERO));
-
- // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
- // and reply with accepted and settled disposition to indicate the rollback succeeded
- Discharge discharge = new Discharge();
- discharge.setFail(true);
- discharge.setTxnId(txnId);
- TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher();
- dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
- testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Accepted(), true);
-
- // Then expect an unsettled 'declare' transfer to the txn coordinator, and
- // reply with a declared disposition state containing the txnId.
- txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
- declareMatcher = new TransferPayloadCompositeMatcher();
- declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
- testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
-
- // Expect the messages that were not consumed to be released
- int unconsumed = transferCount - consumeCount;
- for (int i = 1; i <= unconsumed; i++) {
- testPeer.expectDisposition(true, new ReleasedMatcher());
- }
-
- // Expect the consumer to be 'started' again as rollback completes
- testPeer.expectLinkFlow(false, false, greaterThan(UnsignedInteger.ZERO));
-
- session.rollback();
-
- testPeer.waitForAllHandlersToComplete(1000);
- }
- }
-
- @Test(timeout=20000)
- public void testRollbackTransactedSessionWithPrefetchFullBeforeStoppingConsumer() throws Exception {
- try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
- Connection connection = testFixture.establishConnecton(testPeer);
- int messageCount = 5;
- ((JmsConnection) connection).getPrefetchPolicy().setAll(messageCount);
- connection.start();
-
- testPeer.expectBegin();
- CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
- testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
-
- // First expect an unsettled 'declare' transfer to the txn coordinator, and
- // reply with a declared disposition state containing the txnId.
- Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
- TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
- declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
- testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
-
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- Queue queue = session.createQueue("myQueue");
-
- // Create a consumer and fill the prefetch with messages, which we wont consume any of
- testPeer.expectReceiverAttach();
- testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), messageCount);
-
- session.createConsumer(queue);
-
- // Create a producer to use in provoking creation of the AMQP transaction
- testPeer.expectSenderAttach();
- MessageProducer producer = session.createProducer(queue);
-
- // Expect the message which provoked creating the transaction
- TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
- messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
- messageMatcher.setMessageAnnotationsMatcher( new MessageAnnotationsSectionMatcher(true));
- TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
- stateMatcher.withTxnId(equalTo(txnId));
- stateMatcher.withOutcome(nullValue());
-
- TransactionalState txState = new TransactionalState();
- txState.setTxnId(txnId);
- txState.setOutcome(new Accepted());
-
- testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true);
-
- producer.send(session.createMessage());
-
- // The consumer will be 'stopped' prior to rollback, however we will NOT send a 'drain' Flow
- // frame as we have manipulated that all the credit was already used, i.e. it already stopped.
-
- // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
- // and reply with accepted and settled disposition to indicate the rollback succeeded
- Discharge discharge = new Discharge();
- discharge.setFail(true);
- discharge.setTxnId(txnId);
- TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher();
- dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
- testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Accepted(), true);
-
- // Now expect an unsettled 'declare' transfer to the txn coordinator, and
- // reply with a declared disposition state containing the txnId.
- txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
- declareMatcher = new TransferPayloadCompositeMatcher();
- declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
- testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
-
- // Expect the messages that were not consumed to be released
- for (int i = 1; i <= messageCount; i++) {
- testPeer.expectDisposition(true, new ReleasedMatcher());
- }
-
- // Expect the consumer to be 'started' again as rollback completes
- testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(messageCount)));
-
- session.rollback();
-
- testPeer.waitForAllHandlersToComplete(1000);
- }
- }
-
- @Test(timeout=20000)
- public void testRollbackTransactedSessionWithPrefetchFullyUtilisedByDrainWhenStoppingConsumer() throws Exception {
- try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
- Connection connection = testFixture.establishConnecton(testPeer);
- int messageCount = 5;
- ((JmsConnection) connection).getPrefetchPolicy().setAll(messageCount);
- connection.start();
-
- testPeer.expectBegin();
- CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
- testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
-
- // First expect an unsettled 'declare' transfer to the txn coordinator, and
- // reply with a declared disposition state containing the txnId.
- Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
- TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
- declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
- testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
-
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- Queue queue = session.createQueue("myQueue");
-
- // Create a consumer, expect it to flow credit, but don't send it any messages
- testPeer.expectReceiverAttach();
- testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(messageCount)));
-
- session.createConsumer(queue);
-
- // Create a producer to use in provoking creation of the AMQP transaction
- testPeer.expectSenderAttach();
- MessageProducer producer = session.createProducer(queue);
-
- // Expect the message which provoked creating the transaction
- TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
- messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
- messageMatcher.setMessageAnnotationsMatcher( new MessageAnnotationsSectionMatcher(true));
-
- TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
- stateMatcher.withTxnId(equalTo(txnId));
- stateMatcher.withOutcome(nullValue());
-
- TransactionalState txState = new TransactionalState();
- txState.setTxnId(txnId);
- txState.setOutcome(new Accepted());
-
- testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true);
-
- producer.send(session.createMessage());
-
- // Expect the consumer to be 'stopped' prior to rollback by issuing a 'drain' Flow.
- // Action the drain by filling the prefetch (which is equivalent to this having happened while
- // the Flow was in flight to the peer), and then DONT send a flow frame back to the client
- // as it can tell from the messages that all the credit has been used.
- testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"),
- messageCount, true, false, equalTo(UnsignedInteger.valueOf(messageCount)), 1, false);
-
- // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
- // and reply with accepted and settled disposition to indicate the rollback succeeded
- Discharge discharge = new Discharge();
- discharge.setFail(true);
- discharge.setTxnId(txnId);
- TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher();
- dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
- testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Accepted(), true);
-
- // Then expect an unsettled 'declare' transfer to the txn coordinator, and
- // reply with a declared disposition state containing the txnId.
- txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
- declareMatcher = new TransferPayloadCompositeMatcher();
- declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
- testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
-
- // Expect the messages that were not consumed to be released
- for (int i = 1; i <= messageCount; i++) {
- testPeer.expectDisposition(true, new ReleasedMatcher());
- }
-
- // Expect the consumer to be 'started' again as rollback completes
- testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(messageCount)));
-
- session.rollback();
-
- testPeer.waitForAllHandlersToComplete(1000);
- }
- }
-
- @Test(timeout=20000)
- public void testDefaultOutcomeIsModifiedForConsumerSourceOnTransactedSession() throws Exception {
- try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
- Connection connection = testFixture.establishConnecton(testPeer);
- connection.start();
-
- testPeer.expectBegin();
- CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
- testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
-
- // First expect an unsettled 'declare' transfer to the txn coordinator, and
- // reply with a declared disposition state containing the txnId.
- Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
- TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
- declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
- testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
-
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- String queueName = "myQueue";
- Queue queue = session.createQueue(queueName);
-
- SourceMatcher sourceMatcher = new SourceMatcher();
- sourceMatcher.withAddress(equalTo(queueName));
- sourceMatcher.withDynamic(equalTo(false));
- sourceMatcher.withOutcomes(arrayContaining(Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL));
- ModifiedMatcher outcomeMatcher = new ModifiedMatcher().withDeliveryFailed(equalTo(true)).withUndeliverableHere(nullValue());
- sourceMatcher.withDefaultOutcome(outcomeMatcher);
-
- testPeer.expectReceiverAttach(notNullValue(), sourceMatcher);
- testPeer.expectLinkFlow();
-
- session.createConsumer(queue);
-
- testPeer.waitForAllHandlersToComplete(1000);
- }
- }
@Test(timeout=20000)
public void testPrefetchPolicyInfluencesCreditFlow() throws Exception {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0fd981ca/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
new file mode 100644
index 0000000..947700b
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
@@ -0,0 +1,720 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.integration;
+
+import static org.hamcrest.Matchers.arrayContaining;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.TransactionRolledBackException;
+
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Declare;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Declared;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Discharge;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Error;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Modified;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Released;
+import org.apache.qpid.jms.test.testpeer.describedtypes.TransactionalState;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.ReleasedMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.SourceMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.TransactionalStateMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.junit.Test;
+
+/**
+ * Tests for behavior of Transacted Session operations.
+ */
+public class TransactionsIntegrationTest extends QpidJmsTestCase {
+
+ private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
+
+ @Test(timeout=20000)
+ public void testTransactionCommitFailWithEmptyRejectedDisposition() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin();
+ CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
+ testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a Declared disposition state containing the txnId.
+ Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+ TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
+ declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
+ testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue("myQueue");
+
+ // Create a producer to use in provoking creation of the AMQP transaction
+ testPeer.expectSenderAttach();
+ MessageProducer producer = session.createProducer(queue);
+
+ // Expect the message which was sent under the current transaction. Check it carries
+ // TransactionalState with the above txnId but has no outcome. Respond with a
+ // TransactionalState with Accepted outcome.
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
+ messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
+
+ TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
+ stateMatcher.withTxnId(equalTo(txnId));
+ stateMatcher.withOutcome(nullValue());
+
+ TransactionalState txState = new TransactionalState();
+ txState.setTxnId(txnId);
+ txState.setOutcome(new Accepted());
+
+ testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true);
+
+ producer.send(session.createMessage());
+
+ // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+ // and reply with rejected and settled disposition to indicate the commit failed
+ Discharge discharge = new Discharge();
+ discharge.setFail(false);
+ discharge.setTxnId(txnId);
+ TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher();
+ dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
+ testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Rejected(), true);
+
+ // Then expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+ declareMatcher = new TransferPayloadCompositeMatcher();
+ declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
+ testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+
+ try {
+ session.commit();
+ fail("Commit operation should have failed.");
+ } catch (TransactionRolledBackException jmsTxRb) {
+ }
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout=20000)
+ public void testProducedMessagesAfterCommitOfSentMessagesFails() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin();
+ CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
+ testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a Declared disposition state containing the txnId.
+ Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+ TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
+ declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
+ testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue("myQueue");
+
+ // Create a producer to use in provoking creation of the AMQP transaction
+ testPeer.expectSenderAttach();
+ MessageProducer producer = session.createProducer(queue);
+
+ // Expect the message which was sent under the current transaction. Check it carries
+ // TransactionalState with the above txnId but has no outcome. Respond with a
+ // TransactionalState with Accepted outcome.
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
+ messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
+
+ TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
+ stateMatcher.withTxnId(equalTo(txnId));
+ stateMatcher.withOutcome(nullValue());
+
+ TransactionalState txState = new TransactionalState();
+ txState.setTxnId(txnId);
+ txState.setOutcome(new Accepted());
+
+ testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true);
+
+ producer.send(session.createMessage());
+
+ // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+ // and reply with rejected and settled disposition to indicate the commit failed
+ Discharge discharge = new Discharge();
+ discharge.setFail(false);
+ discharge.setTxnId(txnId);
+ TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher();
+ dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
+ Rejected commitFailure = new Rejected(new Error(Symbol.valueOf("failed"), "Unknown error"));
+ testPeer.expectTransfer(dischargeMatcher, nullValue(), false, commitFailure, true);
+
+ // Then expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+ declareMatcher = new TransferPayloadCompositeMatcher();
+ declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
+ testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+
+ try {
+ session.commit();
+ fail("Commit operation should have failed.");
+ } catch (TransactionRolledBackException jmsTxRb) {
+ }
+
+ // Expect the message which was sent under the current transaction. Check it carries
+ // TransactionalState with the above txnId but has no outcome. Respond with a
+ // TransactionalState with Accepted outcome.
+ stateMatcher = new TransactionalStateMatcher();
+ stateMatcher.withTxnId(equalTo(txnId));
+ stateMatcher.withOutcome(nullValue());
+
+ txState = new TransactionalState();
+ txState.setTxnId(txnId);
+ txState.setOutcome(new Accepted());
+
+ testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true);
+
+ producer.send(session.createMessage());
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout=20000)
+ public void testProducedMessagesAfterRollbackSentMessagesFails() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin();
+ CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
+ testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a Declared disposition state containing the txnId.
+ Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+ TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
+ declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
+ testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue("myQueue");
+
+ // Create a producer to use in provoking creation of the AMQP transaction
+ testPeer.expectSenderAttach();
+ MessageProducer producer = session.createProducer(queue);
+
+ // Expect the message which was sent under the current transaction. Check it carries
+ // TransactionalState with the above txnId but has no outcome. Respond with a
+ // TransactionalState with Accepted outcome.
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
+ messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
+
+ TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
+ stateMatcher.withTxnId(equalTo(txnId));
+ stateMatcher.withOutcome(nullValue());
+
+ TransactionalState txState = new TransactionalState();
+ txState.setTxnId(txnId);
+ txState.setOutcome(new Accepted());
+
+ testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true);
+
+ producer.send(session.createMessage());
+
+ // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+ // and reply with rejected and settled disposition to indicate the rollback failed
+ Discharge discharge = new Discharge();
+ discharge.setFail(true);
+ discharge.setTxnId(txnId);
+ TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher();
+ dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
+ Rejected commitFailure = new Rejected(new Error(Symbol.valueOf("failed"), "Unknown error"));
+ testPeer.expectTransfer(dischargeMatcher, nullValue(), false, commitFailure, true);
+
+ // Then expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+ declareMatcher = new TransferPayloadCompositeMatcher();
+ declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
+ testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+
+ try {
+ session.rollback();
+ fail("Rollback operation should have failed.");
+ } catch (JMSException jmsex) {
+ }
+
+ // Expect the message which was sent under the current transaction. Check it carries
+ // TransactionalState with the above txnId but has no outcome. Respond with a
+ // TransactionalState with Accepted outcome.
+ stateMatcher = new TransactionalStateMatcher();
+ stateMatcher.withTxnId(equalTo(txnId));
+ stateMatcher.withOutcome(nullValue());
+
+ txState = new TransactionalState();
+ txState.setTxnId(txnId);
+ txState.setOutcome(new Accepted());
+
+ testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true);
+
+ producer.send(session.createMessage());
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout=20000)
+ public void testCommitTransactedSessionWithConsumerReceivingAllMessages() throws Exception {
+ doCommitTransactedSessionWithConsumerTestImpl(1, 1);
+ }
+
+ @Test(timeout=20000)
+ public void testCommitTransactedSessionWithConsumerReceivingSomeMessages() throws Exception {
+ doCommitTransactedSessionWithConsumerTestImpl(5, 2);
+ }
+
+ private void doCommitTransactedSessionWithConsumerTestImpl(int transferCount, int consumeCount) throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin();
+ CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
+ testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ Binary txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+ TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
+ declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
+ testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue("myQueue");
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), transferCount);
+
+ for (int i = 1; i <= consumeCount; i++) {
+ // Then expect an *settled* TransactionalState disposition for each message once received by the consumer
+ TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
+ stateMatcher.withTxnId(equalTo(txnId));
+ stateMatcher.withOutcome(new AcceptedMatcher());
+
+ testPeer.expectDisposition(true, stateMatcher);
+ }
+
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+
+ for (int i = 1; i <= consumeCount; i++) {
+ Message receivedMessage = messageConsumer.receive(3000);
+
+ assertNotNull(receivedMessage);
+ assertTrue(receivedMessage instanceof TextMessage);
+ }
+
+ // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+ // and reply with accepted and settled disposition to indicate the commit succeeded
+ Discharge discharge = new Discharge();
+ discharge.setFail(false);
+ discharge.setTxnId(txnId);
+ TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher();
+ dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
+ testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Accepted(), true);
+
+ // Then expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+ declareMatcher = new TransferPayloadCompositeMatcher();
+ declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
+ testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+
+ session.commit();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout=20000)
+ public void testProducedMessagesOnTransactedSessionCarryTxnId() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin();
+ CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
+ testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a Declared disposition state containing the txnId.
+ Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+ TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
+ declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
+ testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue("myQueue");
+
+ // Create a producer to use in provoking creation of the AMQP transaction
+ testPeer.expectSenderAttach();
+ MessageProducer producer = session.createProducer(queue);
+
+ // Expect the message which was sent under the current transaction. Check it carries
+ // TransactionalState with the above txnId but has no outcome. Respond with a
+ // TransactionalState with Accepted outcome.
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
+ messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
+
+ TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
+ stateMatcher.withTxnId(equalTo(txnId));
+ stateMatcher.withOutcome(nullValue());
+
+ TransactionalState txState = new TransactionalState();
+ txState.setTxnId(txnId);
+ txState.setOutcome(new Accepted());
+
+ testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true);
+
+ producer.send(session.createMessage());
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout=20000)
+ public void testRollbackTransactedSessionWithConsumerReceivingAllMessages() throws Exception {
+ doRollbackTransactedSessionWithConsumerTestImpl(1, 1);
+ }
+
+ @Test(timeout=20000)
+ public void testRollbackTransactedSessionWithConsumerReceivingSomeMessages() throws Exception {
+ doRollbackTransactedSessionWithConsumerTestImpl(5, 2);
+ }
+
+ private void doRollbackTransactedSessionWithConsumerTestImpl(int transferCount, int consumeCount) throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin();
+ CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
+ testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+ TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
+ declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
+ testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue("myQueue");
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), transferCount);
+
+ for (int i = 1; i <= consumeCount; i++) {
+ // Then expect a *settled* TransactionalState disposition for each message once received by the consumer
+ TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
+ stateMatcher.withTxnId(equalTo(txnId));
+ stateMatcher.withOutcome(new AcceptedMatcher());
+
+ testPeer.expectDisposition(true, stateMatcher);
+ }
+
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+
+ for (int i = 1; i <= consumeCount; i++) {
+ Message receivedMessage = messageConsumer.receive(3000);
+
+ assertNotNull(receivedMessage);
+ assertTrue(receivedMessage instanceof TextMessage);
+ }
+
+ // Expect the consumer to be 'stopped' prior to rollback by issuing a 'drain'
+ testPeer.expectLinkFlow(true, true, greaterThan(UnsignedInteger.ZERO));
+
+ // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+ // and reply with accepted and settled disposition to indicate the rollback succeeded
+ Discharge discharge = new Discharge();
+ discharge.setFail(true);
+ discharge.setTxnId(txnId);
+ TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher();
+ dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
+ testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Accepted(), true);
+
+ // Then expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+ declareMatcher = new TransferPayloadCompositeMatcher();
+ declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
+ testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+
+ // Expect the messages that were not consumed to be released
+ int unconsumed = transferCount - consumeCount;
+ for (int i = 1; i <= unconsumed; i++) {
+ testPeer.expectDisposition(true, new ReleasedMatcher());
+ }
+
+ // Expect the consumer to be 'started' again as rollback completes
+ testPeer.expectLinkFlow(false, false, greaterThan(UnsignedInteger.ZERO));
+
+ session.rollback();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout=20000)
+ public void testRollbackTransactedSessionWithPrefetchFullBeforeStoppingConsumer() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ int messageCount = 5;
+ ((JmsConnection) connection).getPrefetchPolicy().setAll(messageCount);
+ connection.start();
+
+ testPeer.expectBegin();
+ CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
+ testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+ TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
+ declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
+ testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue("myQueue");
+
+ // Create a consumer and fill the prefetch with messages, which we wont consume any of
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), messageCount);
+
+ session.createConsumer(queue);
+
+ // Create a producer to use in provoking creation of the AMQP transaction
+ testPeer.expectSenderAttach();
+ MessageProducer producer = session.createProducer(queue);
+
+ // Expect the message which provoked creating the transaction
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
+ messageMatcher.setMessageAnnotationsMatcher( new MessageAnnotationsSectionMatcher(true));
+ TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
+ stateMatcher.withTxnId(equalTo(txnId));
+ stateMatcher.withOutcome(nullValue());
+
+ TransactionalState txState = new TransactionalState();
+ txState.setTxnId(txnId);
+ txState.setOutcome(new Accepted());
+
+ testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true);
+
+ producer.send(session.createMessage());
+
+ // The consumer will be 'stopped' prior to rollback, however we will NOT send a 'drain' Flow
+ // frame as we have manipulated that all the credit was already used, i.e. it already stopped.
+
+ // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+ // and reply with accepted and settled disposition to indicate the rollback succeeded
+ Discharge discharge = new Discharge();
+ discharge.setFail(true);
+ discharge.setTxnId(txnId);
+ TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher();
+ dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
+ testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Accepted(), true);
+
+ // Now expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+ declareMatcher = new TransferPayloadCompositeMatcher();
+ declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
+ testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+
+ // Expect the messages that were not consumed to be released
+ for (int i = 1; i <= messageCount; i++) {
+ testPeer.expectDisposition(true, new ReleasedMatcher());
+ }
+
+ // Expect the consumer to be 'started' again as rollback completes
+ testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(messageCount)));
+
+ session.rollback();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout=20000)
+ public void testRollbackTransactedSessionWithPrefetchFullyUtilisedByDrainWhenStoppingConsumer() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ int messageCount = 5;
+ ((JmsConnection) connection).getPrefetchPolicy().setAll(messageCount);
+ connection.start();
+
+ testPeer.expectBegin();
+ CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
+ testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+ TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
+ declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
+ testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue("myQueue");
+
+ // Create a consumer, expect it to flow credit, but don't send it any messages
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(messageCount)));
+
+ session.createConsumer(queue);
+
+ // Create a producer to use in provoking creation of the AMQP transaction
+ testPeer.expectSenderAttach();
+ MessageProducer producer = session.createProducer(queue);
+
+ // Expect the message which provoked creating the transaction
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
+ messageMatcher.setMessageAnnotationsMatcher( new MessageAnnotationsSectionMatcher(true));
+
+ TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
+ stateMatcher.withTxnId(equalTo(txnId));
+ stateMatcher.withOutcome(nullValue());
+
+ TransactionalState txState = new TransactionalState();
+ txState.setTxnId(txnId);
+ txState.setOutcome(new Accepted());
+
+ testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true);
+
+ producer.send(session.createMessage());
+
+ // Expect the consumer to be 'stopped' prior to rollback by issuing a 'drain' Flow.
+ // Action the drain by filling the prefetch (which is equivalent to this having happened while
+ // the Flow was in flight to the peer), and then DONT send a flow frame back to the client
+ // as it can tell from the messages that all the credit has been used.
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"),
+ messageCount, true, false, equalTo(UnsignedInteger.valueOf(messageCount)), 1, false);
+
+ // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+ // and reply with accepted and settled disposition to indicate the rollback succeeded
+ Discharge discharge = new Discharge();
+ discharge.setFail(true);
+ discharge.setTxnId(txnId);
+ TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher();
+ dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
+ testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Accepted(), true);
+
+ // Then expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+ declareMatcher = new TransferPayloadCompositeMatcher();
+ declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
+ testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+
+ // Expect the messages that were not consumed to be released
+ for (int i = 1; i <= messageCount; i++) {
+ testPeer.expectDisposition(true, new ReleasedMatcher());
+ }
+
+ // Expect the consumer to be 'started' again as rollback completes
+ testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(messageCount)));
+
+ session.rollback();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout=20000)
+ public void testDefaultOutcomeIsModifiedForConsumerSourceOnTransactedSession() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin();
+ CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
+ testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+ TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
+ declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
+ testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ String queueName = "myQueue";
+ Queue queue = session.createQueue(queueName);
+
+ SourceMatcher sourceMatcher = new SourceMatcher();
+ sourceMatcher.withAddress(equalTo(queueName));
+ sourceMatcher.withDynamic(equalTo(false));
+ sourceMatcher.withOutcomes(arrayContaining(Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL));
+ ModifiedMatcher outcomeMatcher = new ModifiedMatcher().withDeliveryFailed(equalTo(true)).withUndeliverableHere(nullValue());
+ sourceMatcher.withDefaultOutcome(outcomeMatcher);
+
+ testPeer.expectReceiverAttach(notNullValue(), sourceMatcher);
+ testPeer.expectLinkFlow();
+
+ session.createConsumer(queue);
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org