You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/10/07 15:08:30 UTC
[1/3] activemq-artemis git commit: This closes #821
Repository: activemq-artemis
Updated Branches:
refs/heads/master 73a73bb9e -> 54b7dcc48
This closes #821
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/330ddf0c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/330ddf0c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/330ddf0c
Branch: refs/heads/master
Commit: 330ddf0c6bf81613aea487bf73f383e5405ee6a4
Parents: 73a73bb ea54071
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Oct 7 10:54:02 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Oct 7 10:54:02 2016 -0400
----------------------------------------------------------------------
.../transport/amqp/client/AmqpMessage.java | 28 +++-
.../transport/amqp/client/AmqpReceiver.java | 70 ++++++--
.../integration/amqp/AmqpTransactionTest.java | 162 +++++++++++++++++++
3 files changed, 244 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
[2/3] activemq-artemis git commit: ARTEMIS-773 Tests that show issue
with TX acquisition and retirement.
Posted by cl...@apache.org.
ARTEMIS-773 Tests that show issue with TX acquisition and retirement.
Two new tests that acquire messages inside a TX and manage their
settlement and retirement in differing ways, one test works the other
'testReceiversCommitAndRollbackWithMultipleSessionsInSingleTXNoSettlement'
fails due to messages that were committed being redelivered to the
client unexpectedly.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ea54071f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ea54071f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ea54071f
Branch: refs/heads/master
Commit: ea54071f77f1ff378ca1fa8904b858797c4869d0
Parents: 73a73bb
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Oct 6 16:53:10 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Oct 7 10:54:02 2016 -0400
----------------------------------------------------------------------
.../transport/amqp/client/AmqpMessage.java | 28 +++-
.../transport/amqp/client/AmqpReceiver.java | 70 ++++++--
.../integration/amqp/AmqpTransactionTest.java | 162 +++++++++++++++++++
3 files changed, 244 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea54071f/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index e3e9681..2d95d29 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -128,11 +128,23 @@ public class AmqpMessage {
* @throws Exception if an error occurs during the accept.
*/
public void accept() throws Exception {
+ accept(true);
+ }
+
+ /**
+ * Accepts the message marking it as consumed on the remote peer.
+ *
+ * @param settle
+ * true if the client should also settle the delivery when sending the accept.
+ *
+ * @throws Exception if an error occurs during the accept.
+ */
+ public void accept(boolean settle) throws Exception {
if (receiver == null) {
throw new IllegalStateException("Can't accept non-received message.");
}
- receiver.accept(delivery);
+ receiver.accept(delivery, settle);
}
/**
@@ -142,11 +154,23 @@ public class AmqpMessage {
* @throws Exception if an error occurs during the accept.
*/
public void accept(AmqpSession txnSession) throws Exception {
+ accept(txnSession, true);
+ }
+
+ /**
+ * Accepts the message marking it as consumed on the remote peer.
+ *
+ * @param session
+ * The session that is used to manage acceptance of the message.
+ *
+ * @throws Exception if an error occurs during the accept.
+ */
+ public void accept(AmqpSession txnSession, boolean settle) throws Exception {
if (receiver == null) {
throw new IllegalStateException("Can't accept non-received message.");
}
- receiver.accept(delivery, txnSession);
+ receiver.accept(delivery, txnSession, settle);
}
/**
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea54071f/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
index 8826713..b6d2ba1 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
@@ -16,7 +16,10 @@
*/
package org.apache.activemq.transport.amqp.client;
-import javax.jms.InvalidDestinationException;
+import static org.apache.activemq.transport.amqp.AmqpSupport.COPY;
+import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_NAME;
+import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_NAME;
+
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.HashMap;
@@ -27,6 +30,8 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.jms.InvalidDestinationException;
+
import org.apache.activemq.transport.amqp.client.util.AsyncResult;
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
@@ -52,10 +57,6 @@ import org.apache.qpid.proton.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.activemq.transport.amqp.AmqpSupport.COPY;
-import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_NAME;
-import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_NAME;
-
/**
* Receiver class that manages a Proton receiver endpoint.
*/
@@ -390,27 +391,66 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
}
/**
- * Accepts a message that was dispatched under the given Delivery instance.
+ * Accepts a message that was dispatched under the given Delivery instance and settles the delivery.
+ *
+ * @param delivery
+ * the Delivery instance to accept.
*
- * @param delivery the Delivery instance to accept.
* @throws IOException if an error occurs while sending the accept.
*/
- public void accept(final Delivery delivery) throws IOException {
- accept(delivery, this.session);
+ public void accept(Delivery delivery) throws IOException {
+ accept(delivery, this.session, true);
}
/**
* Accepts a message that was dispatched under the given Delivery instance.
*
+ * @param delivery
+ * the Delivery instance to accept.
+ * @param settle
+ * true if the receiver should settle the delivery or just send the disposition.
+ *
+ * @throws IOException if an error occurs while sending the accept.
+ */
+ public void accept(Delivery delivery, boolean settle) throws IOException {
+ accept(delivery, this.session, settle);
+ }
+
+ /**
+ * Accepts a message that was dispatched under the given Delivery instance and settles the delivery.
+ *
* This method allows for the session that is used in the accept to be specified by the
* caller. This allows for an accepted message to be involved in a transaction that is
* being managed by some other session other than the one that created this receiver.
*
- * @param delivery the Delivery instance to accept.
- * @param session the session under which the message is being accepted.
+ * @param delivery
+ * the Delivery instance to accept.
+ * @param session
+ * the session under which the message is being accepted.
+ *
* @throws IOException if an error occurs while sending the accept.
*/
public void accept(final Delivery delivery, final AmqpSession session) throws IOException {
+ accept(delivery, session, true);
+ }
+
+ /**
+ * Accepts a message that was dispatched under the given Delivery instance.
+ *
+ * This method allows for the session that is used in the accept to be specified by the
+ * caller. This allows for an accepted message to be involved in a transaction that is
+ * being managed by some other session other than the one that created this receiver.
+ *
+ * @param delivery
+ * the Delivery instance to accept.
+ * @param session
+ * the session under which the message is being accepted.
+ * @param settle
+ * true if the receiver should settle the delivery or just send the disposition.
+ *
+ * @throws IOException if an error occurs while sending the accept.
+ */
+ public void accept(final Delivery delivery, final AmqpSession session, final boolean settle) throws IOException {
checkClosed();
if (delivery == null) {
@@ -440,11 +480,13 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
txState.setOutcome(Accepted.getInstance());
txState.setTxnId(txnId);
delivery.disposition(txState);
- delivery.settle();
session.getTransactionContext().registerTxConsumer(AmqpReceiver.this);
}
} else {
delivery.disposition(Accepted.getInstance());
+ }
+
+ if (settle) {
delivery.settle();
}
}
@@ -462,8 +504,8 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
/**
* Mark a message that was dispatched under the given Delivery instance as Modified.
*
- * @param delivery the Delivery instance to mark modified.
- * @param deliveryFailed indicates that the delivery failed for some reason.
+ * @param delivery the Delivery instance to mark modified.
+ * @param deliveryFailed indicates that the delivery failed for some reason.
* @param undeliverableHere marks the delivery as not being able to be process by link it was sent to.
* @throws IOException if an error occurs while sending the reject.
*/
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea54071f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
index e84534f..e82e7b3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
@@ -622,4 +622,166 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
connection.close();
}
+
+ @Test(timeout = 60000)
+ public void testReceiversCommitAndRollbackWithMultipleSessionsInSingleTXNoSettlement() throws Exception {
+ final int NUM_MESSAGES = 10;
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = client.connect();
+
+ // Root TXN session controls all TXN send lifetimes.
+ AmqpSession txnSession = connection.createSession();
+
+ // Normal Session which won't create an TXN itself
+ AmqpSession session = connection.createSession();
+ AmqpSender sender = session.createSender(getTestName());
+
+ for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
+ AmqpMessage message = new AmqpMessage();
+ message.setText("Test-Message");
+ message.setApplicationProperty("msgId", i);
+ sender.send(message, txnSession.getTransactionId());
+ }
+
+ // Read all messages from the Queue, do not accept them yet.
+ AmqpReceiver receiver = session.createReceiver(getTestName());
+ ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
+ receiver.flow((NUM_MESSAGES + 2) * 2);
+ for (int i = 0; i < NUM_MESSAGES; ++i) {
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ System.out.println("Read message: " + message.getApplicationProperty("msgId"));
+ assertNotNull(message);
+ messages.add(message);
+ }
+
+ // Commit half the consumed messages [0, 1, 2, 3, 4]
+ txnSession.begin();
+ for (int i = 0; i < NUM_MESSAGES / 2; ++i) {
+ System.out.println("Commit: Accepting message: " + messages.get(i).getApplicationProperty("msgId"));
+ messages.get(i).accept(txnSession, false);
+ }
+ txnSession.commit();
+
+ // Rollback the other half the consumed messages [5, 6, 7, 8, 9]
+ txnSession.begin();
+ for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
+ System.out.println("Rollback: Accepting message: " + messages.get(i).getApplicationProperty("msgId"));
+ messages.get(i).accept(txnSession, false);
+ }
+ txnSession.rollback();
+
+ // After rollback messages should still be acquired so we read last sent message [10]
+ {
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ System.out.println("Read message: " + message.getApplicationProperty("msgId"));
+ assertNotNull(message);
+ assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
+ message.release();
+ }
+
+ // Commit the other half the consumed messages [5, 6, 7, 8, 9] which should still be acquired
+ txnSession.begin();
+ for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
+ messages.get(i).accept(txnSession);
+ }
+ txnSession.commit();
+
+ // The final message [10] should still be pending as we released it previously and committed
+ // the previously accepted but not settled messages [5, 6, 7, 8, 9] in a new TX
+ {
+ receiver.flow(1);
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ System.out.println("Read message: " + message.getApplicationProperty("msgId"));
+ assertNotNull(message);
+ assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
+ message.accept();
+ }
+
+ // We should have now drained the Queue
+ receiver.flow(1);
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ if (message != null) {
+ System.out.println("Read message: " + message.getApplicationProperty("msgId"));
+ }
+ assertNull(message);
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testCommitAndRollbackWithMultipleSessionsInSingleTXNoSettlement() throws Exception {
+ final int NUM_MESSAGES = 10;
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = client.connect();
+
+ // Root TXN session controls all TXN send lifetimes.
+ AmqpSession txnSession = connection.createSession();
+
+ // Normal Session which won't create an TXN itself
+ AmqpSession session = connection.createSession();
+ AmqpSender sender = session.createSender(getTestName());
+
+ for (int i = 0; i < NUM_MESSAGES; ++i) {
+ AmqpMessage message = new AmqpMessage();
+ message.setText("Test-Message");
+ message.setApplicationProperty("msgId", i);
+ sender.send(message, txnSession.getTransactionId());
+ }
+
+ // Read all messages from the Queue, do not accept them yet.
+ AmqpReceiver receiver = session.createReceiver(getTestName());
+ receiver.flow(2);
+ AmqpMessage message1 = receiver.receive(5, TimeUnit.SECONDS);
+ AmqpMessage message2 = receiver.receive(5, TimeUnit.SECONDS);
+
+ // Accept the first one in a TXN and send a new message in that TXN as well
+ txnSession.begin();
+ {
+ // This will result in message [0[ being consumed once we commit.
+ message1.accept(txnSession, false);
+ System.out.println("Commit: accepting message: " + message1.getApplicationProperty("msgId"));
+
+ AmqpMessage message = new AmqpMessage();
+ message.setText("Test-Message");
+ message.setApplicationProperty("msgId", NUM_MESSAGES);
+
+ sender.send(message, txnSession.getTransactionId());
+ }
+ txnSession.commit();
+
+ // Accept the second one in a TXN and send a new message in that TXN as well but rollback
+ txnSession.begin();
+ {
+ message2.accept(txnSession, false);
+ System.out.println("Rollback: accepting message: " + message2.getApplicationProperty("msgId"));
+
+ AmqpMessage message = new AmqpMessage();
+ message.setText("Test-Message");
+ message.setApplicationProperty("msgId", NUM_MESSAGES + 1);
+ sender.send(message, txnSession.getTransactionId());
+ }
+ txnSession.rollback();
+
+ // This releases message [1]
+ message2.release();
+
+ // Should be ten message available for dispatch given that we sent and committed one, and
+ // releases another we had previously received.
+ receiver.flow(10);
+ for (int i = 1; i <= NUM_MESSAGES; ++i) {
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull("Expected a message for: " + i, message);
+ System.out.println("Accepting message: " + message.getApplicationProperty("msgId"));
+ assertEquals(i, message.getApplicationProperty("msgId"));
+ message.accept();
+ }
+
+ // Should be nothing left.
+ receiver.flow(1);
+ assertNull(receiver.receive(1, TimeUnit.SECONDS));
+
+ connection.close();
+ }
}
[3/3] activemq-artemis git commit: ARTEMIS-773 closing connections
through finally to avoid thread leakage
Posted by cl...@apache.org.
ARTEMIS-773 closing connections through finally to avoid thread leakage
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/54b7dcc4
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/54b7dcc4
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/54b7dcc4
Branch: refs/heads/master
Commit: 54b7dcc48e3e76eca2a7cca1348e7c2625cc7ba6
Parents: 330ddf0
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Oct 7 10:57:21 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Oct 7 11:04:45 2016 -0400
----------------------------------------------------------------------
.../integration/amqp/AmqpTransactionTest.java | 275 ++++++++++---------
1 file changed, 141 insertions(+), 134 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54b7dcc4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
index e82e7b3..e42a718 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
@@ -485,73 +485,77 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
- // Root TXN session controls all TXN send lifetimes.
- AmqpSession txnSession = connection.createSession();
-
- // Normal Session which won't create an TXN itself
- AmqpSession session = connection.createSession();
- AmqpSender sender = session.createSender(getTestName());
+ try {
- for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
- AmqpMessage message = new AmqpMessage();
- message.setText("Test-Message");
- message.setApplicationProperty("msgId", i);
- sender.send(message, txnSession.getTransactionId());
- }
+ // Root TXN session controls all TXN send lifetimes.
+ AmqpSession txnSession = connection.createSession();
- // Read all messages from the Queue, do not accept them yet.
- AmqpReceiver receiver = session.createReceiver(getTestName());
- ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
- receiver.flow((NUM_MESSAGES + 2) * 2);
- for (int i = 0; i < NUM_MESSAGES; ++i) {
- AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
- assertNotNull(message);
- messages.add(message);
- }
-
- // Commit half the consumed messages
- txnSession.begin();
- for (int i = 0; i < NUM_MESSAGES / 2; ++i) {
- messages.get(i).accept(txnSession);
- }
- txnSession.commit();
-
- // Rollback the other half the consumed messages
- txnSession.begin();
- for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
- messages.get(i).accept(txnSession);
- }
- txnSession.rollback();
-
- {
- AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
- assertNotNull(message);
- assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
- message.release();
- }
-
- // Commit the other half the consumed messages
- // This is a variation from the .NET client tests which doesn't settle the
- // messages in the TX until commit is called but on ActiveMQ they will be
- // redispatched regardless and not stay in the acquired state.
- txnSession.begin();
- for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
- AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
- assertNotNull(message);
- message.accept();
- }
- txnSession.commit();
+ // Normal Session which won't create an TXN itself
+ AmqpSession session = connection.createSession();
+ AmqpSender sender = session.createSender(getTestName());
- // The final message should still be pending.
- {
- AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
- receiver.flow(1);
- assertNotNull(message);
- assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
- message.release();
+ for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
+ AmqpMessage message = new AmqpMessage();
+ message.setText("Test-Message");
+ message.setApplicationProperty("msgId", i);
+ sender.send(message, txnSession.getTransactionId());
+ }
+
+ // Read all messages from the Queue, do not accept them yet.
+ AmqpReceiver receiver = session.createReceiver(getTestName());
+ ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
+ receiver.flow((NUM_MESSAGES + 2) * 2);
+ for (int i = 0; i < NUM_MESSAGES; ++i) {
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(message);
+ messages.add(message);
+ }
+
+ // Commit half the consumed messages
+ txnSession.begin();
+ for (int i = 0; i < NUM_MESSAGES / 2; ++i) {
+ messages.get(i).accept(txnSession);
+ }
+ txnSession.commit();
+
+ // Rollback the other half the consumed messages
+ txnSession.begin();
+ for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
+ messages.get(i).accept(txnSession);
+ }
+ txnSession.rollback();
+
+ {
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(message);
+ assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
+ message.release();
+ }
+
+ // Commit the other half the consumed messages
+ // This is a variation from the .NET client tests which doesn't settle the
+ // messages in the TX until commit is called but on ActiveMQ they will be
+ // redispatched regardless and not stay in the acquired state.
+ txnSession.begin();
+ for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(message);
+ message.accept();
+ }
+ txnSession.commit();
+
+ // The final message should still be pending.
+ {
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ receiver.flow(1);
+ assertNotNull(message);
+ assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
+ message.release();
+ }
+
+ } finally {
+ connection.close();
}
-
- connection.close();
}
@Test(timeout = 60000)
@@ -630,83 +634,86 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
- // Root TXN session controls all TXN send lifetimes.
- AmqpSession txnSession = connection.createSession();
-
- // Normal Session which won't create an TXN itself
- AmqpSession session = connection.createSession();
- AmqpSender sender = session.createSender(getTestName());
-
- for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
- AmqpMessage message = new AmqpMessage();
- message.setText("Test-Message");
- message.setApplicationProperty("msgId", i);
- sender.send(message, txnSession.getTransactionId());
- }
-
- // Read all messages from the Queue, do not accept them yet.
- AmqpReceiver receiver = session.createReceiver(getTestName());
- ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
- receiver.flow((NUM_MESSAGES + 2) * 2);
- for (int i = 0; i < NUM_MESSAGES; ++i) {
- AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
- System.out.println("Read message: " + message.getApplicationProperty("msgId"));
- assertNotNull(message);
- messages.add(message);
- }
-
- // Commit half the consumed messages [0, 1, 2, 3, 4]
- txnSession.begin();
- for (int i = 0; i < NUM_MESSAGES / 2; ++i) {
- System.out.println("Commit: Accepting message: " + messages.get(i).getApplicationProperty("msgId"));
- messages.get(i).accept(txnSession, false);
- }
- txnSession.commit();
+ try {
- // Rollback the other half the consumed messages [5, 6, 7, 8, 9]
- txnSession.begin();
- for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
- System.out.println("Rollback: Accepting message: " + messages.get(i).getApplicationProperty("msgId"));
- messages.get(i).accept(txnSession, false);
- }
- txnSession.rollback();
+ // Root TXN session controls all TXN send lifetimes.
+ AmqpSession txnSession = connection.createSession();
- // After rollback messages should still be acquired so we read last sent message [10]
- {
- AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
- System.out.println("Read message: " + message.getApplicationProperty("msgId"));
- assertNotNull(message);
- assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
- message.release();
- }
-
- // Commit the other half the consumed messages [5, 6, 7, 8, 9] which should still be acquired
- txnSession.begin();
- for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
- messages.get(i).accept(txnSession);
- }
- txnSession.commit();
+ // Normal Session which won't create an TXN itself
+ AmqpSession session = connection.createSession();
+ AmqpSender sender = session.createSender(getTestName());
- // The final message [10] should still be pending as we released it previously and committed
- // the previously accepted but not settled messages [5, 6, 7, 8, 9] in a new TX
- {
+ for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
+ AmqpMessage message = new AmqpMessage();
+ message.setText("Test-Message");
+ message.setApplicationProperty("msgId", i);
+ sender.send(message, txnSession.getTransactionId());
+ }
+
+ // Read all messages from the Queue, do not accept them yet.
+ AmqpReceiver receiver = session.createReceiver(getTestName());
+ ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
+ receiver.flow((NUM_MESSAGES + 2) * 2);
+ for (int i = 0; i < NUM_MESSAGES; ++i) {
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ System.out.println("Read message: " + message.getApplicationProperty("msgId"));
+ assertNotNull(message);
+ messages.add(message);
+ }
+
+ // Commit half the consumed messages [0, 1, 2, 3, 4]
+ txnSession.begin();
+ for (int i = 0; i < NUM_MESSAGES / 2; ++i) {
+ System.out.println("Commit: Accepting message: " + messages.get(i).getApplicationProperty("msgId"));
+ messages.get(i).accept(txnSession, false);
+ }
+ txnSession.commit();
+
+ // Rollback the other half the consumed messages [5, 6, 7, 8, 9]
+ txnSession.begin();
+ for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
+ System.out.println("Rollback: Accepting message: " + messages.get(i).getApplicationProperty("msgId"));
+ messages.get(i).accept(txnSession, false);
+ }
+ txnSession.rollback();
+
+ // After rollback messages should still be acquired so we read last sent message [10]
+ {
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ System.out.println("Read message: " + message.getApplicationProperty("msgId"));
+ assertNotNull(message);
+ assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
+ message.release();
+ }
+
+ // Commit the other half the consumed messages [5, 6, 7, 8, 9] which should still be acquired
+ txnSession.begin();
+ for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
+ messages.get(i).accept(txnSession);
+ }
+ txnSession.commit();
+
+ // The final message [10] should still be pending as we released it previously and committed
+ // the previously accepted but not settled messages [5, 6, 7, 8, 9] in a new TX
+ {
+ receiver.flow(1);
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ System.out.println("Read message: " + message.getApplicationProperty("msgId"));
+ assertNotNull(message);
+ assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
+ message.accept();
+ }
+
+ // We should have now drained the Queue
receiver.flow(1);
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
- System.out.println("Read message: " + message.getApplicationProperty("msgId"));
- assertNotNull(message);
- assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
- message.accept();
+ if (message != null) {
+ System.out.println("Read message: " + message.getApplicationProperty("msgId"));
+ }
+ assertNull(message);
+ } finally {
+ connection.close();
}
-
- // We should have now drained the Queue
- receiver.flow(1);
- AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
- if (message != null) {
- System.out.println("Read message: " + message.getApplicationProperty("msgId"));
- }
- assertNull(message);
-
- connection.close();
}
@Test(timeout = 60000)