You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/09/27 16:46:21 UTC

[09/14] activemq git commit: NO-JIRA: Add some additional tests ported from the .NET AMQP client

NO-JIRA: Add some additional tests ported from the .NET AMQP client

Adds some transaction tests ported from AMQP .NET client with some
variances based on the way the test client works and limitations in the
brokers handling of Transacted sends.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/12ee866a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/12ee866a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/12ee866a

Branch: refs/heads/activemq-5.14.x
Commit: 12ee866a6e1b72b53fa7a1de4b300098c92375ae
Parents: fa55149
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Sep 15 13:24:18 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Sep 27 12:15:18 2016 -0400

----------------------------------------------------------------------
 .../amqp/interop/AmqpTransactionTest.java       | 202 +++++++++++++++++++
 1 file changed, 202 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/12ee866a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
index 97089a9..7cf6026 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
@@ -18,8 +18,10 @@ package org.apache.activemq.transport.amqp.interop;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.util.ArrayList;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.broker.jmx.QueueViewMBean;
@@ -372,4 +374,204 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
 
         assertEquals(0, queue.getQueueSize());
     }
+
+    //----- Tests Ported from AmqpNetLite client -----------------------------//
+
+    @Test(timeout = 60000)
+    public void testSendersCommitAndRollbackWithMultipleSessionsInSingleTX() throws Exception {
+        final int NUM_MESSAGES = 5;
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+
+        // Root TXN session controls all TXN send lifetimes.
+        AmqpSession txnSession = connection.createSession();
+
+        // Normal Session which won't create an TXN itself
+        AmqpSession session = connection.createSession();
+
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+
+        // Commit TXN work from a sender.
+        txnSession.begin();
+        for (int i = 0; i < NUM_MESSAGES; ++i) {
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message");
+            sender.send(message, txnSession.getTransactionId());
+        }
+        txnSession.commit();
+
+        // Rollback an additional batch of TXN work from a sender.
+        txnSession.begin();
+        for (int i = 0; i < NUM_MESSAGES; ++i) {
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message");
+            sender.send(message, txnSession.getTransactionId());
+        }
+        txnSession.rollback();
+
+        // Commit more TXN work from a sender.
+        txnSession.begin();
+        for (int i = 0; i < NUM_MESSAGES; ++i) {
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message");
+            sender.send(message, txnSession.getTransactionId());
+        }
+        txnSession.commit();
+
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+        receiver.flow(NUM_MESSAGES * 2);
+        for (int i = 0; i < NUM_MESSAGES * 2; ++i) {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(message);
+            message.accept(txnSession);
+        }
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testReceiversCommitAndRollbackWithMultipleSessionsInSingleTX() throws Exception {
+        final int NUM_MESSAGES = 10;
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+
+        // Root TXN session controls all TXN send lifetimes.
+        AmqpSession txnSession = connection.createSession();
+
+        // Normal Session which won't create an TXN itself
+        AmqpSession session = connection.createSession();
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+
+        for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message");
+            message.setApplicationProperty("msgId", i);
+            sender.send(message, txnSession.getTransactionId());
+        }
+
+        // Read all messages from the Queue, do not accept them yet.
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+        ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
+        receiver.flow((NUM_MESSAGES + 2) * 2);
+        for (int i = 0; i < NUM_MESSAGES; ++i) {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(message);
+            messages.add(message);
+        }
+
+        // Commit half the consumed messages
+        txnSession.begin();
+        for (int i = 0; i < NUM_MESSAGES / 2; ++i) {
+            messages.get(i).accept(txnSession);
+        }
+        txnSession.commit();
+
+        // Rollback the other half the consumed messages
+        txnSession.begin();
+        for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
+            messages.get(i).accept(txnSession);
+        }
+        txnSession.rollback();
+
+        {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(message);
+            assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
+            message.release();
+        }
+
+        // Commit the other half the consumed messages
+        // This is a variation from the .NET client tests which doesn't settle the
+        // messages in the TX until commit is called but on ActiveMQ they will be
+        // redispatched regardless and not stay in the acquired state.
+        txnSession.begin();
+        for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(message);
+            message.accept();
+        }
+        txnSession.commit();
+
+        // The final message should still be pending.
+        {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            receiver.flow(1);
+            assertNotNull(message);
+            assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
+            message.release();
+        }
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testCommitAndRollbackWithMultipleSessionsInSingleTX() throws Exception {
+        final int NUM_MESSAGES = 10;
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+
+        // Root TXN session controls all TXN send lifetimes.
+        AmqpSession txnSession = connection.createSession();
+
+        // Normal Session which won't create an TXN itself
+        AmqpSession session = connection.createSession();
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+
+        for (int i = 0; i < NUM_MESSAGES; ++i) {
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message");
+            message.setApplicationProperty("msgId", i);
+            sender.send(message, txnSession.getTransactionId());
+        }
+
+        // Read all messages from the Queue, do not accept them yet.
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+        receiver.flow(2);
+        AmqpMessage message1 = receiver.receive(5, TimeUnit.SECONDS);
+        AmqpMessage message2 = receiver.receive(5, TimeUnit.SECONDS);
+
+        // Accept the first one in a TXN and send a new message in that TXN as well
+        txnSession.begin();
+        {
+            message1.accept(txnSession);
+
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message");
+            message.setApplicationProperty("msgId", NUM_MESSAGES);
+
+            sender.send(message, txnSession.getTransactionId());
+        }
+        txnSession.commit();
+
+        // Accept the second one in a TXN and send a new message in that TXN as well but rollback
+        txnSession.begin();
+        {
+            message2.accept(txnSession);
+
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message");
+            message.setApplicationProperty("msgId", NUM_MESSAGES + 1);
+            sender.send(message, txnSession.getTransactionId());
+        }
+        txnSession.rollback();
+
+        // Variation here from .NET code, the client settles the accepted message where
+        // the .NET client does not and instead releases here to have it redelivered.
+
+        receiver.flow(NUM_MESSAGES);
+        for (int i = 1; i <= NUM_MESSAGES; ++i) {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(message);
+            assertEquals(i, message.getApplicationProperty("msgId"));
+            message.accept();
+        }
+
+        // Should be nothing left.
+        assertNull(receiver.receive(1, TimeUnit.SECONDS));
+
+        connection.close();
+    }
 }