You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/10/07 15:08:32 UTC

[3/3] activemq-artemis git commit: ARTEMIS-773 closing connections through finally to avoid thread leakage

ARTEMIS-773 closing connections through finally to avoid thread leakage


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/54b7dcc4
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/54b7dcc4
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/54b7dcc4

Branch: refs/heads/master
Commit: 54b7dcc48e3e76eca2a7cca1348e7c2625cc7ba6
Parents: 330ddf0
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Oct 7 10:57:21 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Oct 7 11:04:45 2016 -0400

----------------------------------------------------------------------
 .../integration/amqp/AmqpTransactionTest.java   | 275 ++++++++++---------
 1 file changed, 141 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54b7dcc4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
index e82e7b3..e42a718 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
@@ -485,73 +485,77 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
       AmqpClient client = createAmqpClient();
       AmqpConnection connection = addConnection(client.connect());
 
-      // Root TXN session controls all TXN send lifetimes.
-      AmqpSession txnSession = connection.createSession();
-
-      // Normal Session which won't create an TXN itself
-      AmqpSession session = connection.createSession();
-      AmqpSender sender = session.createSender(getTestName());
+      try {
 
-      for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
-         AmqpMessage message = new AmqpMessage();
-         message.setText("Test-Message");
-         message.setApplicationProperty("msgId", i);
-         sender.send(message, txnSession.getTransactionId());
-      }
+         // Root TXN session controls all TXN send lifetimes.
+         AmqpSession txnSession = connection.createSession();
 
-      // Read all messages from the Queue, do not accept them yet.
-      AmqpReceiver receiver = session.createReceiver(getTestName());
-      ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
-      receiver.flow((NUM_MESSAGES + 2) * 2);
-      for (int i = 0; i < NUM_MESSAGES; ++i) {
-         AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
-         assertNotNull(message);
-         messages.add(message);
-      }
-
-      // Commit half the consumed messages
-      txnSession.begin();
-      for (int i = 0; i < NUM_MESSAGES / 2; ++i) {
-         messages.get(i).accept(txnSession);
-      }
-      txnSession.commit();
-
-      // Rollback the other half the consumed messages
-      txnSession.begin();
-      for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
-         messages.get(i).accept(txnSession);
-      }
-      txnSession.rollback();
-
-      {
-         AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
-         assertNotNull(message);
-         assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
-         message.release();
-      }
-
-      // Commit the other half the consumed messages
-      // This is a variation from the .NET client tests which doesn't settle the
-      // messages in the TX until commit is called but on ActiveMQ they will be
-      // redispatched regardless and not stay in the acquired state.
-      txnSession.begin();
-      for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
-         AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
-         assertNotNull(message);
-         message.accept();
-      }
-      txnSession.commit();
+         // Normal Session which won't create an TXN itself
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(getTestName());
 
-      // The final message should still be pending.
-      {
-         AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
-         receiver.flow(1);
-         assertNotNull(message);
-         assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
-         message.release();
+         for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message");
+            message.setApplicationProperty("msgId", i);
+            sender.send(message, txnSession.getTransactionId());
+         }
+
+         // Read all messages from the Queue, do not accept them yet.
+         AmqpReceiver receiver = session.createReceiver(getTestName());
+         ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
+         receiver.flow((NUM_MESSAGES + 2) * 2);
+         for (int i = 0; i < NUM_MESSAGES; ++i) {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(message);
+            messages.add(message);
+         }
+
+         // Commit half the consumed messages
+         txnSession.begin();
+         for (int i = 0; i < NUM_MESSAGES / 2; ++i) {
+            messages.get(i).accept(txnSession);
+         }
+         txnSession.commit();
+
+         // Rollback the other half the consumed messages
+         txnSession.begin();
+         for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
+            messages.get(i).accept(txnSession);
+         }
+         txnSession.rollback();
+
+         {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(message);
+            assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
+            message.release();
+         }
+
+         // Commit the other half the consumed messages
+         // This is a variation from the .NET client tests which doesn't settle the
+         // messages in the TX until commit is called but on ActiveMQ they will be
+         // redispatched regardless and not stay in the acquired state.
+         txnSession.begin();
+         for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(message);
+            message.accept();
+         }
+         txnSession.commit();
+
+         // The final message should still be pending.
+         {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            receiver.flow(1);
+            assertNotNull(message);
+            assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
+            message.release();
+         }
+
+      } finally {
+         connection.close();
       }
-
-      connection.close();
    }
 
    @Test(timeout = 60000)
@@ -630,83 +634,86 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
       AmqpClient client = createAmqpClient();
       AmqpConnection connection = client.connect();
 
-      // Root TXN session controls all TXN send lifetimes.
-      AmqpSession txnSession = connection.createSession();
-
-      // Normal Session which won't create an TXN itself
-      AmqpSession session = connection.createSession();
-      AmqpSender sender = session.createSender(getTestName());
-
-      for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
-         AmqpMessage message = new AmqpMessage();
-         message.setText("Test-Message");
-         message.setApplicationProperty("msgId", i);
-         sender.send(message, txnSession.getTransactionId());
-      }
-
-      // Read all messages from the Queue, do not accept them yet.
-      AmqpReceiver receiver = session.createReceiver(getTestName());
-      ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
-      receiver.flow((NUM_MESSAGES + 2) * 2);
-      for (int i = 0; i < NUM_MESSAGES; ++i) {
-         AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
-         System.out.println("Read message: " + message.getApplicationProperty("msgId"));
-         assertNotNull(message);
-         messages.add(message);
-      }
-
-      // Commit half the consumed messages [0, 1, 2, 3, 4]
-      txnSession.begin();
-      for (int i = 0; i < NUM_MESSAGES / 2; ++i) {
-         System.out.println("Commit: Accepting message: " + messages.get(i).getApplicationProperty("msgId"));
-         messages.get(i).accept(txnSession, false);
-      }
-      txnSession.commit();
+      try {
 
-      // Rollback the other half the consumed messages [5, 6, 7, 8, 9]
-      txnSession.begin();
-      for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
-         System.out.println("Rollback: Accepting message: " + messages.get(i).getApplicationProperty("msgId"));
-         messages.get(i).accept(txnSession, false);
-      }
-      txnSession.rollback();
+         // Root TXN session controls all TXN send lifetimes.
+         AmqpSession txnSession = connection.createSession();
 
-      // After rollback messages should still be acquired so we read last sent message [10]
-      {
-         AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
-         System.out.println("Read message: " + message.getApplicationProperty("msgId"));
-         assertNotNull(message);
-         assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
-         message.release();
-      }
-
-      // Commit the other half the consumed messages [5, 6, 7, 8, 9] which should still be acquired
-      txnSession.begin();
-      for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
-         messages.get(i).accept(txnSession);
-      }
-      txnSession.commit();
+         // Normal Session which won't create an TXN itself
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(getTestName());
 
-      // The final message [10] should still be pending as we released it previously and committed
-      // the previously accepted but not settled messages [5, 6, 7, 8, 9] in a new TX
-      {
+         for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message");
+            message.setApplicationProperty("msgId", i);
+            sender.send(message, txnSession.getTransactionId());
+         }
+
+         // Read all messages from the Queue, do not accept them yet.
+         AmqpReceiver receiver = session.createReceiver(getTestName());
+         ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
+         receiver.flow((NUM_MESSAGES + 2) * 2);
+         for (int i = 0; i < NUM_MESSAGES; ++i) {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            System.out.println("Read message: " + message.getApplicationProperty("msgId"));
+            assertNotNull(message);
+            messages.add(message);
+         }
+
+         // Commit half the consumed messages [0, 1, 2, 3, 4]
+         txnSession.begin();
+         for (int i = 0; i < NUM_MESSAGES / 2; ++i) {
+            System.out.println("Commit: Accepting message: " + messages.get(i).getApplicationProperty("msgId"));
+            messages.get(i).accept(txnSession, false);
+         }
+         txnSession.commit();
+
+         // Rollback the other half the consumed messages [5, 6, 7, 8, 9]
+         txnSession.begin();
+         for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
+            System.out.println("Rollback: Accepting message: " + messages.get(i).getApplicationProperty("msgId"));
+            messages.get(i).accept(txnSession, false);
+         }
+         txnSession.rollback();
+
+         // After rollback messages should still be acquired so we read last sent message [10]
+         {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            System.out.println("Read message: " + message.getApplicationProperty("msgId"));
+            assertNotNull(message);
+            assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
+            message.release();
+         }
+
+         // Commit the other half the consumed messages [5, 6, 7, 8, 9] which should still be acquired
+         txnSession.begin();
+         for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
+            messages.get(i).accept(txnSession);
+         }
+         txnSession.commit();
+
+         // The final message [10] should still be pending as we released it previously and committed
+         // the previously accepted but not settled messages [5, 6, 7, 8, 9] in a new TX
+         {
+            receiver.flow(1);
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            System.out.println("Read message: " + message.getApplicationProperty("msgId"));
+            assertNotNull(message);
+            assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
+            message.accept();
+         }
+
+         // We should have now drained the Queue
          receiver.flow(1);
          AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
-         System.out.println("Read message: " + message.getApplicationProperty("msgId"));
-         assertNotNull(message);
-         assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
-         message.accept();
+         if (message != null) {
+            System.out.println("Read message: " + message.getApplicationProperty("msgId"));
+         }
+         assertNull(message);
+      } finally {
+         connection.close();
       }
-
-      // We should have now drained the Queue
-      receiver.flow(1);
-      AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
-      if (message != null) {
-         System.out.println("Read message: " + message.getApplicationProperty("msgId"));
-      }
-      assertNull(message);
-
-      connection.close();
    }
 
    @Test(timeout = 60000)