You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/10/07 15:08:32 UTC
[3/3] activemq-artemis git commit: ARTEMIS-773 closing connections
through finally to avoid thread leakage
ARTEMIS-773 closing connections through finally to avoid thread leakage
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/54b7dcc4
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/54b7dcc4
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/54b7dcc4
Branch: refs/heads/master
Commit: 54b7dcc48e3e76eca2a7cca1348e7c2625cc7ba6
Parents: 330ddf0
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Oct 7 10:57:21 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Oct 7 11:04:45 2016 -0400
----------------------------------------------------------------------
.../integration/amqp/AmqpTransactionTest.java | 275 ++++++++++---------
1 file changed, 141 insertions(+), 134 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54b7dcc4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
index e82e7b3..e42a718 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
@@ -485,73 +485,77 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
- // Root TXN session controls all TXN send lifetimes.
- AmqpSession txnSession = connection.createSession();
-
- // Normal Session which won't create an TXN itself
- AmqpSession session = connection.createSession();
- AmqpSender sender = session.createSender(getTestName());
+ try {
- for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
- AmqpMessage message = new AmqpMessage();
- message.setText("Test-Message");
- message.setApplicationProperty("msgId", i);
- sender.send(message, txnSession.getTransactionId());
- }
+ // Root TXN session controls all TXN send lifetimes.
+ AmqpSession txnSession = connection.createSession();
- // Read all messages from the Queue, do not accept them yet.
- AmqpReceiver receiver = session.createReceiver(getTestName());
- ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
- receiver.flow((NUM_MESSAGES + 2) * 2);
- for (int i = 0; i < NUM_MESSAGES; ++i) {
- AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
- assertNotNull(message);
- messages.add(message);
- }
-
- // Commit half the consumed messages
- txnSession.begin();
- for (int i = 0; i < NUM_MESSAGES / 2; ++i) {
- messages.get(i).accept(txnSession);
- }
- txnSession.commit();
-
- // Rollback the other half the consumed messages
- txnSession.begin();
- for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
- messages.get(i).accept(txnSession);
- }
- txnSession.rollback();
-
- {
- AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
- assertNotNull(message);
- assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
- message.release();
- }
-
- // Commit the other half the consumed messages
- // This is a variation from the .NET client tests which doesn't settle the
- // messages in the TX until commit is called but on ActiveMQ they will be
- // redispatched regardless and not stay in the acquired state.
- txnSession.begin();
- for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
- AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
- assertNotNull(message);
- message.accept();
- }
- txnSession.commit();
+ // Normal Session which won't create an TXN itself
+ AmqpSession session = connection.createSession();
+ AmqpSender sender = session.createSender(getTestName());
- // The final message should still be pending.
- {
- AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
- receiver.flow(1);
- assertNotNull(message);
- assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
- message.release();
+ for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
+ AmqpMessage message = new AmqpMessage();
+ message.setText("Test-Message");
+ message.setApplicationProperty("msgId", i);
+ sender.send(message, txnSession.getTransactionId());
+ }
+
+ // Read all messages from the Queue, do not accept them yet.
+ AmqpReceiver receiver = session.createReceiver(getTestName());
+ ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
+ receiver.flow((NUM_MESSAGES + 2) * 2);
+ for (int i = 0; i < NUM_MESSAGES; ++i) {
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(message);
+ messages.add(message);
+ }
+
+ // Commit half the consumed messages
+ txnSession.begin();
+ for (int i = 0; i < NUM_MESSAGES / 2; ++i) {
+ messages.get(i).accept(txnSession);
+ }
+ txnSession.commit();
+
+ // Rollback the other half the consumed messages
+ txnSession.begin();
+ for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
+ messages.get(i).accept(txnSession);
+ }
+ txnSession.rollback();
+
+ {
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(message);
+ assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
+ message.release();
+ }
+
+ // Commit the other half the consumed messages
+ // This is a variation from the .NET client tests which doesn't settle the
+ // messages in the TX until commit is called but on ActiveMQ they will be
+ // redispatched regardless and not stay in the acquired state.
+ txnSession.begin();
+ for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(message);
+ message.accept();
+ }
+ txnSession.commit();
+
+ // The final message should still be pending.
+ {
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ receiver.flow(1);
+ assertNotNull(message);
+ assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
+ message.release();
+ }
+
+ } finally {
+ connection.close();
}
-
- connection.close();
}
@Test(timeout = 60000)
@@ -630,83 +634,86 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
- // Root TXN session controls all TXN send lifetimes.
- AmqpSession txnSession = connection.createSession();
-
- // Normal Session which won't create an TXN itself
- AmqpSession session = connection.createSession();
- AmqpSender sender = session.createSender(getTestName());
-
- for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
- AmqpMessage message = new AmqpMessage();
- message.setText("Test-Message");
- message.setApplicationProperty("msgId", i);
- sender.send(message, txnSession.getTransactionId());
- }
-
- // Read all messages from the Queue, do not accept them yet.
- AmqpReceiver receiver = session.createReceiver(getTestName());
- ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
- receiver.flow((NUM_MESSAGES + 2) * 2);
- for (int i = 0; i < NUM_MESSAGES; ++i) {
- AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
- System.out.println("Read message: " + message.getApplicationProperty("msgId"));
- assertNotNull(message);
- messages.add(message);
- }
-
- // Commit half the consumed messages [0, 1, 2, 3, 4]
- txnSession.begin();
- for (int i = 0; i < NUM_MESSAGES / 2; ++i) {
- System.out.println("Commit: Accepting message: " + messages.get(i).getApplicationProperty("msgId"));
- messages.get(i).accept(txnSession, false);
- }
- txnSession.commit();
+ try {
- // Rollback the other half the consumed messages [5, 6, 7, 8, 9]
- txnSession.begin();
- for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
- System.out.println("Rollback: Accepting message: " + messages.get(i).getApplicationProperty("msgId"));
- messages.get(i).accept(txnSession, false);
- }
- txnSession.rollback();
+ // Root TXN session controls all TXN send lifetimes.
+ AmqpSession txnSession = connection.createSession();
- // After rollback messages should still be acquired so we read last sent message [10]
- {
- AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
- System.out.println("Read message: " + message.getApplicationProperty("msgId"));
- assertNotNull(message);
- assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
- message.release();
- }
-
- // Commit the other half the consumed messages [5, 6, 7, 8, 9] which should still be acquired
- txnSession.begin();
- for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
- messages.get(i).accept(txnSession);
- }
- txnSession.commit();
+ // Normal Session which won't create an TXN itself
+ AmqpSession session = connection.createSession();
+ AmqpSender sender = session.createSender(getTestName());
- // The final message [10] should still be pending as we released it previously and committed
- // the previously accepted but not settled messages [5, 6, 7, 8, 9] in a new TX
- {
+ for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
+ AmqpMessage message = new AmqpMessage();
+ message.setText("Test-Message");
+ message.setApplicationProperty("msgId", i);
+ sender.send(message, txnSession.getTransactionId());
+ }
+
+ // Read all messages from the Queue, do not accept them yet.
+ AmqpReceiver receiver = session.createReceiver(getTestName());
+ ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
+ receiver.flow((NUM_MESSAGES + 2) * 2);
+ for (int i = 0; i < NUM_MESSAGES; ++i) {
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ System.out.println("Read message: " + message.getApplicationProperty("msgId"));
+ assertNotNull(message);
+ messages.add(message);
+ }
+
+ // Commit half the consumed messages [0, 1, 2, 3, 4]
+ txnSession.begin();
+ for (int i = 0; i < NUM_MESSAGES / 2; ++i) {
+ System.out.println("Commit: Accepting message: " + messages.get(i).getApplicationProperty("msgId"));
+ messages.get(i).accept(txnSession, false);
+ }
+ txnSession.commit();
+
+ // Rollback the other half the consumed messages [5, 6, 7, 8, 9]
+ txnSession.begin();
+ for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
+ System.out.println("Rollback: Accepting message: " + messages.get(i).getApplicationProperty("msgId"));
+ messages.get(i).accept(txnSession, false);
+ }
+ txnSession.rollback();
+
+ // After rollback messages should still be acquired so we read last sent message [10]
+ {
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ System.out.println("Read message: " + message.getApplicationProperty("msgId"));
+ assertNotNull(message);
+ assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
+ message.release();
+ }
+
+ // Commit the other half the consumed messages [5, 6, 7, 8, 9] which should still be acquired
+ txnSession.begin();
+ for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
+ messages.get(i).accept(txnSession);
+ }
+ txnSession.commit();
+
+ // The final message [10] should still be pending as we released it previously and committed
+ // the previously accepted but not settled messages [5, 6, 7, 8, 9] in a new TX
+ {
+ receiver.flow(1);
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ System.out.println("Read message: " + message.getApplicationProperty("msgId"));
+ assertNotNull(message);
+ assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
+ message.accept();
+ }
+
+ // We should have now drained the Queue
receiver.flow(1);
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
- System.out.println("Read message: " + message.getApplicationProperty("msgId"));
- assertNotNull(message);
- assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
- message.accept();
+ if (message != null) {
+ System.out.println("Read message: " + message.getApplicationProperty("msgId"));
+ }
+ assertNull(message);
+ } finally {
+ connection.close();
}
-
- // We should have now drained the Queue
- receiver.flow(1);
- AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
- if (message != null) {
- System.out.println("Read message: " + message.getApplicationProperty("msgId"));
- }
- assertNull(message);
-
- connection.close();
}
@Test(timeout = 60000)