You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/10/07 15:08:30 UTC

[1/3] activemq-artemis git commit: This closes #821

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 73a73bb9e -> 54b7dcc48


This closes #821


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/330ddf0c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/330ddf0c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/330ddf0c

Branch: refs/heads/master
Commit: 330ddf0c6bf81613aea487bf73f383e5405ee6a4
Parents: 73a73bb ea54071
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Oct 7 10:54:02 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Oct 7 10:54:02 2016 -0400

----------------------------------------------------------------------
 .../transport/amqp/client/AmqpMessage.java      |  28 +++-
 .../transport/amqp/client/AmqpReceiver.java     |  70 ++++++--
 .../integration/amqp/AmqpTransactionTest.java   | 162 +++++++++++++++++++
 3 files changed, 244 insertions(+), 16 deletions(-)
----------------------------------------------------------------------



[2/3] activemq-artemis git commit: ARTEMIS-773 Tests that show issue with TX acquisition and retirement.

Posted by cl...@apache.org.
ARTEMIS-773 Tests that show issue with TX acquisition and retirement.

Two new tests that acquire messages inside a TX and manage their
settlement and retirement in differing ways, one test works the other
'testReceiversCommitAndRollbackWithMultipleSessionsInSingleTXNoSettlement' 
fails due to messages that were committed being redelivered to the
client unexpectedly.  

Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ea54071f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ea54071f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ea54071f

Branch: refs/heads/master
Commit: ea54071f77f1ff378ca1fa8904b858797c4869d0
Parents: 73a73bb
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Oct 6 16:53:10 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Oct 7 10:54:02 2016 -0400

----------------------------------------------------------------------
 .../transport/amqp/client/AmqpMessage.java      |  28 +++-
 .../transport/amqp/client/AmqpReceiver.java     |  70 ++++++--
 .../integration/amqp/AmqpTransactionTest.java   | 162 +++++++++++++++++++
 3 files changed, 244 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea54071f/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index e3e9681..2d95d29 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -128,11 +128,23 @@ public class AmqpMessage {
     * @throws Exception if an error occurs during the accept.
     */
    public void accept() throws Exception {
+      accept(true);
+   }
+
+   /**
+    * Accepts the message marking it as consumed on the remote peer.
+    *
+    * @param settle
+    *        true if the client should also settle the delivery when sending the accept.
+    *
+    * @throws Exception if an error occurs during the accept.
+    */
+   public void accept(boolean settle) throws Exception {
       if (receiver == null) {
          throw new IllegalStateException("Can't accept non-received message.");
       }
 
-      receiver.accept(delivery);
+      receiver.accept(delivery, settle);
    }
 
    /**
@@ -142,11 +154,23 @@ public class AmqpMessage {
     * @throws Exception if an error occurs during the accept.
     */
    public void accept(AmqpSession txnSession) throws Exception {
+      accept(txnSession, true);
+   }
+
+   /**
+    * Accepts the message marking it as consumed on the remote peer.
+    *
+    * @param session
+    *      The session that is used to manage acceptance of the message.
+    *
+    * @throws Exception if an error occurs during the accept.
+    */
+   public void accept(AmqpSession txnSession, boolean settle) throws Exception {
       if (receiver == null) {
          throw new IllegalStateException("Can't accept non-received message.");
       }
 
-      receiver.accept(delivery, txnSession);
+      receiver.accept(delivery, txnSession, settle);
    }
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea54071f/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
index 8826713..b6d2ba1 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
@@ -16,7 +16,10 @@
  */
 package org.apache.activemq.transport.amqp.client;
 
-import javax.jms.InvalidDestinationException;
+import static org.apache.activemq.transport.amqp.AmqpSupport.COPY;
+import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_NAME;
+import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_NAME;
+
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.HashMap;
@@ -27,6 +30,8 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import javax.jms.InvalidDestinationException;
+
 import org.apache.activemq.transport.amqp.client.util.AsyncResult;
 import org.apache.activemq.transport.amqp.client.util.ClientFuture;
 import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
@@ -52,10 +57,6 @@ import org.apache.qpid.proton.message.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.activemq.transport.amqp.AmqpSupport.COPY;
-import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_NAME;
-import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_NAME;
-
 /**
  * Receiver class that manages a Proton receiver endpoint.
  */
@@ -390,27 +391,66 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
    }
 
    /**
-    * Accepts a message that was dispatched under the given Delivery instance.
+    * Accepts a message that was dispatched under the given Delivery instance and settles the delivery.
+    *
+    * @param delivery
+    *        the Delivery instance to accept.
     *
-    * @param delivery the Delivery instance to accept.
     * @throws IOException if an error occurs while sending the accept.
     */
-   public void accept(final Delivery delivery) throws IOException {
-      accept(delivery, this.session);
+   public void accept(Delivery delivery) throws IOException {
+      accept(delivery, this.session, true);
    }
 
    /**
     * Accepts a message that was dispatched under the given Delivery instance.
     *
+    * @param delivery
+    *        the Delivery instance to accept.
+    * @param settle
+    *        true if the receiver should settle the delivery or just send the disposition.
+    *
+    * @throws IOException if an error occurs while sending the accept.
+    */
+   public void accept(Delivery delivery, boolean settle) throws IOException {
+      accept(delivery, this.session, settle);
+   }
+
+   /**
+    * Accepts a message that was dispatched under the given Delivery instance and settles the delivery.
+    *
     * This method allows for the session that is used in the accept to be specified by the
     * caller.  This allows for an accepted message to be involved in a transaction that is
     * being managed by some other session other than the one that created this receiver.
     *
-    * @param delivery the Delivery instance to accept.
-    * @param session  the session under which the message is being accepted.
+    * @param delivery
+    *        the Delivery instance to accept.
+    * @param session
+    *        the session under which the message is being accepted.
+    *
     * @throws IOException if an error occurs while sending the accept.
     */
    public void accept(final Delivery delivery, final AmqpSession session) throws IOException {
+      accept(delivery, session, true);
+   }
+
+   /**
+    * Accepts a message that was dispatched under the given Delivery instance.
+    *
+    * This method allows for the session that is used in the accept to be specified by the
+    * caller.  This allows for an accepted message to be involved in a transaction that is
+    * being managed by some other session other than the one that created this receiver.
+    *
+    * @param delivery
+    *        the Delivery instance to accept.
+    * @param session
+    *        the session under which the message is being accepted.
+    * @param settle
+    *        true if the receiver should settle the delivery or just send the disposition.
+    *
+    * @throws IOException if an error occurs while sending the accept.
+    */
+   public void accept(final Delivery delivery, final AmqpSession session, final boolean settle) throws IOException {
       checkClosed();
 
       if (delivery == null) {
@@ -440,11 +480,13 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
                         txState.setOutcome(Accepted.getInstance());
                         txState.setTxnId(txnId);
                         delivery.disposition(txState);
-                        delivery.settle();
                         session.getTransactionContext().registerTxConsumer(AmqpReceiver.this);
                      }
                   } else {
                      delivery.disposition(Accepted.getInstance());
+                  }
+
+                  if (settle) {
                      delivery.settle();
                   }
                }
@@ -462,8 +504,8 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
    /**
     * Mark a message that was dispatched under the given Delivery instance as Modified.
     *
-    * @param delivery          the Delivery instance to mark modified.
-    * @param deliveryFailed    indicates that the delivery failed for some reason.
+    * @param delivery the Delivery instance to mark modified.
+    * @param deliveryFailed indicates that the delivery failed for some reason.
     * @param undeliverableHere marks the delivery as not being able to be process by link it was sent to.
     * @throws IOException if an error occurs while sending the reject.
     */

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea54071f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
index e84534f..e82e7b3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
@@ -622,4 +622,166 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
 
       connection.close();
    }
+
+   @Test(timeout = 60000)
+   public void testReceiversCommitAndRollbackWithMultipleSessionsInSingleTXNoSettlement() throws Exception {
+      final int NUM_MESSAGES = 10;
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = client.connect();
+
+      // Root TXN session controls all TXN send lifetimes.
+      AmqpSession txnSession = connection.createSession();
+
+      // Normal Session which won't create an TXN itself
+      AmqpSession session = connection.createSession();
+      AmqpSender sender = session.createSender(getTestName());
+
+      for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
+         AmqpMessage message = new AmqpMessage();
+         message.setText("Test-Message");
+         message.setApplicationProperty("msgId", i);
+         sender.send(message, txnSession.getTransactionId());
+      }
+
+      // Read all messages from the Queue, do not accept them yet.
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+      ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
+      receiver.flow((NUM_MESSAGES + 2) * 2);
+      for (int i = 0; i < NUM_MESSAGES; ++i) {
+         AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+         System.out.println("Read message: " + message.getApplicationProperty("msgId"));
+         assertNotNull(message);
+         messages.add(message);
+      }
+
+      // Commit half the consumed messages [0, 1, 2, 3, 4]
+      txnSession.begin();
+      for (int i = 0; i < NUM_MESSAGES / 2; ++i) {
+         System.out.println("Commit: Accepting message: " + messages.get(i).getApplicationProperty("msgId"));
+         messages.get(i).accept(txnSession, false);
+      }
+      txnSession.commit();
+
+      // Rollback the other half the consumed messages [5, 6, 7, 8, 9]
+      txnSession.begin();
+      for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
+         System.out.println("Rollback: Accepting message: " + messages.get(i).getApplicationProperty("msgId"));
+         messages.get(i).accept(txnSession, false);
+      }
+      txnSession.rollback();
+
+      // After rollback messages should still be acquired so we read last sent message [10]
+      {
+         AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+         System.out.println("Read message: " + message.getApplicationProperty("msgId"));
+         assertNotNull(message);
+         assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
+         message.release();
+      }
+
+      // Commit the other half the consumed messages [5, 6, 7, 8, 9] which should still be acquired
+      txnSession.begin();
+      for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
+         messages.get(i).accept(txnSession);
+      }
+      txnSession.commit();
+
+      // The final message [10] should still be pending as we released it previously and committed
+      // the previously accepted but not settled messages [5, 6, 7, 8, 9] in a new TX
+      {
+         receiver.flow(1);
+         AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+         System.out.println("Read message: " + message.getApplicationProperty("msgId"));
+         assertNotNull(message);
+         assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
+         message.accept();
+      }
+
+      // We should have now drained the Queue
+      receiver.flow(1);
+      AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+      if (message != null) {
+         System.out.println("Read message: " + message.getApplicationProperty("msgId"));
+      }
+      assertNull(message);
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testCommitAndRollbackWithMultipleSessionsInSingleTXNoSettlement() throws Exception {
+      final int NUM_MESSAGES = 10;
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = client.connect();
+
+      // Root TXN session controls all TXN send lifetimes.
+      AmqpSession txnSession = connection.createSession();
+
+      // Normal Session which won't create an TXN itself
+      AmqpSession session = connection.createSession();
+      AmqpSender sender = session.createSender(getTestName());
+
+      for (int i = 0; i < NUM_MESSAGES; ++i) {
+         AmqpMessage message = new AmqpMessage();
+         message.setText("Test-Message");
+         message.setApplicationProperty("msgId", i);
+         sender.send(message, txnSession.getTransactionId());
+      }
+
+      // Read all messages from the Queue, do not accept them yet.
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+      receiver.flow(2);
+      AmqpMessage message1 = receiver.receive(5, TimeUnit.SECONDS);
+      AmqpMessage message2 = receiver.receive(5, TimeUnit.SECONDS);
+
+      // Accept the first one in a TXN and send a new message in that TXN as well
+      txnSession.begin();
+      {
+         // This will result in message [0[ being consumed once we commit.
+         message1.accept(txnSession, false);
+         System.out.println("Commit: accepting message: " + message1.getApplicationProperty("msgId"));
+
+         AmqpMessage message = new AmqpMessage();
+         message.setText("Test-Message");
+         message.setApplicationProperty("msgId", NUM_MESSAGES);
+
+         sender.send(message, txnSession.getTransactionId());
+      }
+      txnSession.commit();
+
+      // Accept the second one in a TXN and send a new message in that TXN as well but rollback
+      txnSession.begin();
+      {
+         message2.accept(txnSession, false);
+         System.out.println("Rollback: accepting message: " + message2.getApplicationProperty("msgId"));
+
+         AmqpMessage message = new AmqpMessage();
+         message.setText("Test-Message");
+         message.setApplicationProperty("msgId", NUM_MESSAGES + 1);
+         sender.send(message, txnSession.getTransactionId());
+      }
+      txnSession.rollback();
+
+      // This releases message [1]
+      message2.release();
+
+      // Should be ten message available for dispatch given that we sent and committed one, and
+      // releases another we had previously received.
+      receiver.flow(10);
+      for (int i = 1; i <= NUM_MESSAGES; ++i) {
+         AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+         assertNotNull("Expected a message for: " + i, message);
+         System.out.println("Accepting message: " + message.getApplicationProperty("msgId"));
+         assertEquals(i, message.getApplicationProperty("msgId"));
+         message.accept();
+      }
+
+      // Should be nothing left.
+      receiver.flow(1);
+      assertNull(receiver.receive(1, TimeUnit.SECONDS));
+
+      connection.close();
+   }
 }


[3/3] activemq-artemis git commit: ARTEMIS-773 closing connections through finally to avoid thread leakage

Posted by cl...@apache.org.
ARTEMIS-773 closing connections through finally to avoid thread leakage


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/54b7dcc4
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/54b7dcc4
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/54b7dcc4

Branch: refs/heads/master
Commit: 54b7dcc48e3e76eca2a7cca1348e7c2625cc7ba6
Parents: 330ddf0
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Oct 7 10:57:21 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Oct 7 11:04:45 2016 -0400

----------------------------------------------------------------------
 .../integration/amqp/AmqpTransactionTest.java   | 275 ++++++++++---------
 1 file changed, 141 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54b7dcc4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
index e82e7b3..e42a718 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
@@ -485,73 +485,77 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
       AmqpClient client = createAmqpClient();
       AmqpConnection connection = addConnection(client.connect());
 
-      // Root TXN session controls all TXN send lifetimes.
-      AmqpSession txnSession = connection.createSession();
-
-      // Normal Session which won't create an TXN itself
-      AmqpSession session = connection.createSession();
-      AmqpSender sender = session.createSender(getTestName());
+      try {
 
-      for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
-         AmqpMessage message = new AmqpMessage();
-         message.setText("Test-Message");
-         message.setApplicationProperty("msgId", i);
-         sender.send(message, txnSession.getTransactionId());
-      }
+         // Root TXN session controls all TXN send lifetimes.
+         AmqpSession txnSession = connection.createSession();
 
-      // Read all messages from the Queue, do not accept them yet.
-      AmqpReceiver receiver = session.createReceiver(getTestName());
-      ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
-      receiver.flow((NUM_MESSAGES + 2) * 2);
-      for (int i = 0; i < NUM_MESSAGES; ++i) {
-         AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
-         assertNotNull(message);
-         messages.add(message);
-      }
-
-      // Commit half the consumed messages
-      txnSession.begin();
-      for (int i = 0; i < NUM_MESSAGES / 2; ++i) {
-         messages.get(i).accept(txnSession);
-      }
-      txnSession.commit();
-
-      // Rollback the other half the consumed messages
-      txnSession.begin();
-      for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
-         messages.get(i).accept(txnSession);
-      }
-      txnSession.rollback();
-
-      {
-         AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
-         assertNotNull(message);
-         assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
-         message.release();
-      }
-
-      // Commit the other half the consumed messages
-      // This is a variation from the .NET client tests which doesn't settle the
-      // messages in the TX until commit is called but on ActiveMQ they will be
-      // redispatched regardless and not stay in the acquired state.
-      txnSession.begin();
-      for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
-         AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
-         assertNotNull(message);
-         message.accept();
-      }
-      txnSession.commit();
+         // Normal Session which won't create an TXN itself
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(getTestName());
 
-      // The final message should still be pending.
-      {
-         AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
-         receiver.flow(1);
-         assertNotNull(message);
-         assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
-         message.release();
+         for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message");
+            message.setApplicationProperty("msgId", i);
+            sender.send(message, txnSession.getTransactionId());
+         }
+
+         // Read all messages from the Queue, do not accept them yet.
+         AmqpReceiver receiver = session.createReceiver(getTestName());
+         ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
+         receiver.flow((NUM_MESSAGES + 2) * 2);
+         for (int i = 0; i < NUM_MESSAGES; ++i) {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(message);
+            messages.add(message);
+         }
+
+         // Commit half the consumed messages
+         txnSession.begin();
+         for (int i = 0; i < NUM_MESSAGES / 2; ++i) {
+            messages.get(i).accept(txnSession);
+         }
+         txnSession.commit();
+
+         // Rollback the other half the consumed messages
+         txnSession.begin();
+         for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
+            messages.get(i).accept(txnSession);
+         }
+         txnSession.rollback();
+
+         {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(message);
+            assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
+            message.release();
+         }
+
+         // Commit the other half the consumed messages
+         // This is a variation from the .NET client tests which doesn't settle the
+         // messages in the TX until commit is called but on ActiveMQ they will be
+         // redispatched regardless and not stay in the acquired state.
+         txnSession.begin();
+         for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(message);
+            message.accept();
+         }
+         txnSession.commit();
+
+         // The final message should still be pending.
+         {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            receiver.flow(1);
+            assertNotNull(message);
+            assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
+            message.release();
+         }
+
+      } finally {
+         connection.close();
       }
-
-      connection.close();
    }
 
    @Test(timeout = 60000)
@@ -630,83 +634,86 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
       AmqpClient client = createAmqpClient();
       AmqpConnection connection = client.connect();
 
-      // Root TXN session controls all TXN send lifetimes.
-      AmqpSession txnSession = connection.createSession();
-
-      // Normal Session which won't create an TXN itself
-      AmqpSession session = connection.createSession();
-      AmqpSender sender = session.createSender(getTestName());
-
-      for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
-         AmqpMessage message = new AmqpMessage();
-         message.setText("Test-Message");
-         message.setApplicationProperty("msgId", i);
-         sender.send(message, txnSession.getTransactionId());
-      }
-
-      // Read all messages from the Queue, do not accept them yet.
-      AmqpReceiver receiver = session.createReceiver(getTestName());
-      ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
-      receiver.flow((NUM_MESSAGES + 2) * 2);
-      for (int i = 0; i < NUM_MESSAGES; ++i) {
-         AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
-         System.out.println("Read message: " + message.getApplicationProperty("msgId"));
-         assertNotNull(message);
-         messages.add(message);
-      }
-
-      // Commit half the consumed messages [0, 1, 2, 3, 4]
-      txnSession.begin();
-      for (int i = 0; i < NUM_MESSAGES / 2; ++i) {
-         System.out.println("Commit: Accepting message: " + messages.get(i).getApplicationProperty("msgId"));
-         messages.get(i).accept(txnSession, false);
-      }
-      txnSession.commit();
+      try {
 
-      // Rollback the other half the consumed messages [5, 6, 7, 8, 9]
-      txnSession.begin();
-      for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
-         System.out.println("Rollback: Accepting message: " + messages.get(i).getApplicationProperty("msgId"));
-         messages.get(i).accept(txnSession, false);
-      }
-      txnSession.rollback();
+         // Root TXN session controls all TXN send lifetimes.
+         AmqpSession txnSession = connection.createSession();
 
-      // After rollback messages should still be acquired so we read last sent message [10]
-      {
-         AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
-         System.out.println("Read message: " + message.getApplicationProperty("msgId"));
-         assertNotNull(message);
-         assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
-         message.release();
-      }
-
-      // Commit the other half the consumed messages [5, 6, 7, 8, 9] which should still be acquired
-      txnSession.begin();
-      for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
-         messages.get(i).accept(txnSession);
-      }
-      txnSession.commit();
+         // Normal Session which won't create an TXN itself
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(getTestName());
 
-      // The final message [10] should still be pending as we released it previously and committed
-      // the previously accepted but not settled messages [5, 6, 7, 8, 9] in a new TX
-      {
+         for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message");
+            message.setApplicationProperty("msgId", i);
+            sender.send(message, txnSession.getTransactionId());
+         }
+
+         // Read all messages from the Queue, do not accept them yet.
+         AmqpReceiver receiver = session.createReceiver(getTestName());
+         ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
+         receiver.flow((NUM_MESSAGES + 2) * 2);
+         for (int i = 0; i < NUM_MESSAGES; ++i) {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            System.out.println("Read message: " + message.getApplicationProperty("msgId"));
+            assertNotNull(message);
+            messages.add(message);
+         }
+
+         // Commit half the consumed messages [0, 1, 2, 3, 4]
+         txnSession.begin();
+         for (int i = 0; i < NUM_MESSAGES / 2; ++i) {
+            System.out.println("Commit: Accepting message: " + messages.get(i).getApplicationProperty("msgId"));
+            messages.get(i).accept(txnSession, false);
+         }
+         txnSession.commit();
+
+         // Rollback the other half the consumed messages [5, 6, 7, 8, 9]
+         txnSession.begin();
+         for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
+            System.out.println("Rollback: Accepting message: " + messages.get(i).getApplicationProperty("msgId"));
+            messages.get(i).accept(txnSession, false);
+         }
+         txnSession.rollback();
+
+         // After rollback messages should still be acquired so we read last sent message [10]
+         {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            System.out.println("Read message: " + message.getApplicationProperty("msgId"));
+            assertNotNull(message);
+            assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
+            message.release();
+         }
+
+         // Commit the other half the consumed messages [5, 6, 7, 8, 9] which should still be acquired
+         txnSession.begin();
+         for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
+            messages.get(i).accept(txnSession);
+         }
+         txnSession.commit();
+
+         // The final message [10] should still be pending as we released it previously and committed
+         // the previously accepted but not settled messages [5, 6, 7, 8, 9] in a new TX
+         {
+            receiver.flow(1);
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            System.out.println("Read message: " + message.getApplicationProperty("msgId"));
+            assertNotNull(message);
+            assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
+            message.accept();
+         }
+
+         // We should have now drained the Queue
          receiver.flow(1);
          AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
-         System.out.println("Read message: " + message.getApplicationProperty("msgId"));
-         assertNotNull(message);
-         assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
-         message.accept();
+         if (message != null) {
+            System.out.println("Read message: " + message.getApplicationProperty("msgId"));
+         }
+         assertNull(message);
+      } finally {
+         connection.close();
       }
-
-      // We should have now drained the Queue
-      receiver.flow(1);
-      AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
-      if (message != null) {
-         System.out.println("Read message: " + message.getApplicationProperty("msgId"));
-      }
-      assertNull(message);
-
-      connection.close();
    }
 
    @Test(timeout = 60000)