You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/09/14 22:24:02 UTC

activemq git commit: NO-JIRA AMQP Test updates

Repository: activemq
Updated Branches:
  refs/heads/master da9fedead -> b4ab0e1af


NO-JIRA AMQP Test updates

Adds support for doing sends and receives that are enrolled in a
transaction created in a session other than the session that created the
sender or receiver.  Adds some tests that show this in action. 

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b4ab0e1a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b4ab0e1a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b4ab0e1a

Branch: refs/heads/master
Commit: b4ab0e1af9fb0bb56b8a4a9cfc948727a7b92e0c
Parents: da9fede
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Sep 14 18:23:52 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Sep 14 18:23:52 2016 -0400

----------------------------------------------------------------------
 .../transport/amqp/client/AmqpMessage.java      |  19 +-
 .../transport/amqp/client/AmqpReceiver.java     |  26 +++
 .../transport/amqp/client/AmqpSender.java       |  29 ++-
 .../transport/amqp/client/AmqpSession.java      |   8 +-
 .../amqp/interop/AmqpTransactionTest.java       | 194 +++++++++++++++++++
 5 files changed, 269 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/b4ab0e1a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index 99f4cfb..8b378e1 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -140,6 +140,22 @@ public class AmqpMessage {
     }
 
     /**
+     * Accepts the message marking it as consumed on the remote peer.
+     *
+     * @param session
+     *      The session that is used to manage acceptance of the message.
+     *
+     * @throws Exception if an error occurs during the accept.
+     */
+    public void accept(AmqpSession txnSession) throws Exception {
+        if (receiver == null) {
+            throw new IllegalStateException("Can't accept non-received message.");
+        }
+
+        receiver.accept(delivery, txnSession);
+    }
+
+    /**
      * Marks the message as Modified, indicating whether it failed to deliver and is not deliverable here.
      *
      * @param deliveryFailed
@@ -374,7 +390,7 @@ public class AmqpMessage {
      * @param key
      *        the name used to lookup the property in the application properties.
      *
-     * @return the propety value or null if not set.
+     * @return the property value or null if not set.
      */
     public Object getApplicationProperty(String key) {
         if (applicationPropertiesMap == null) {
@@ -560,6 +576,7 @@ public class AmqpMessage {
             message.setHeader(new Header());
         }
     }
+
     private void lazyCreateProperties() {
         if (message.getProperties() == null) {
             message.setProperties(new Properties());

http://git-wip-us.apache.org/repos/asf/activemq/blob/b4ab0e1a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
index 77a529d..999e033 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
@@ -422,12 +422,38 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
      * @throws IOException if an error occurs while sending the accept.
      */
     public void accept(final Delivery delivery) throws IOException {
+        accept(delivery, this.session);
+    }
+
+    /**
+     * Accepts a message that was dispatched under the given Delivery instance.
+     *
+     * This method allows for the session that is used in the accept to be specified by the
+     * caller.  This allows for an accepted message to be involved in a transaction that is
+     * being managed by some other session other than the one that created this receiver.
+     *
+     * @param delivery
+     *        the Delivery instance to accept.
+     * @param session
+     *        the session under which the message is being accepted.
+     *
+     * @throws IOException if an error occurs while sending the accept.
+     */
+    public void accept(final Delivery delivery, final AmqpSession session) throws IOException {
         checkClosed();
 
         if (delivery == null) {
             throw new IllegalArgumentException("Delivery to accept cannot be null");
         }
 
+        if (session == null) {
+            throw new IllegalArgumentException("Session given cannot be null");
+        }
+
+        if (session.getConnection() != this.session.getConnection()) {
+            throw new IllegalArgumentException("The session used for accept must originate from the connection that created this receiver.");
+        }
+
         final ClientFuture request = new ClientFuture();
         session.getScheduler().execute(new Runnable() {
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/b4ab0e1a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
index f9d6435..dd3a371 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
@@ -127,6 +127,21 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
      */
     public void send(final AmqpMessage message) throws IOException {
         checkClosed();
+        send(message, null);
+    }
+
+    /**
+     * Sends the given message to this senders assigned address using the supplied transaction ID.
+     *
+     * @param message
+     *        the message to send.
+     * @param txId
+     *        the transaction ID to assign the outgoing send.
+     *
+     * @throws IOException if an error occurs during the send.
+     */
+    public void send(final AmqpMessage message, final AmqpTransactionId txId) throws IOException {
+        checkClosed();
         final ClientFuture sendRequest = new ClientFuture();
 
         session.getScheduler().execute(new Runnable() {
@@ -134,7 +149,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
             @Override
             public void run() {
                 try {
-                    doSend(message, sendRequest);
+                    doSend(message, sendRequest, txId);
                     session.pumpToProtonTransport(sendRequest);
                 } catch (Exception e) {
                     sendRequest.onFailure(e);
@@ -319,7 +334,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
         }
     }
 
-    private void doSend(AmqpMessage message, AsyncResult request) throws Exception {
+    private void doSend(AmqpMessage message, AsyncResult request, AmqpTransactionId txId) throws Exception {
         LOG.trace("Producer sending message: {}", message);
 
         Delivery delivery = null;
@@ -332,8 +347,14 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
 
         delivery.setContext(request);
 
-        if (session.isInTransaction()) {
-            Binary amqpTxId = session.getTransactionId().getRemoteTxId();
+        Binary amqpTxId = null;
+        if (txId != null) {
+            amqpTxId = txId.getRemoteTxId();
+        } else if (session.isInTransaction()) {
+            amqpTxId = session.getTransactionId().getRemoteTxId();
+        }
+
+        if (amqpTxId != null) {
             TransactionalState state = new TransactionalState();
             state.setTxnId(amqpTxId);
             delivery.disposition(state);

http://git-wip-us.apache.org/repos/asf/activemq/blob/b4ab0e1a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index ae99f65..3804603 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -464,8 +464,12 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
         connection.pumpToProtonTransport(request);
     }
 
-    AmqpTransactionId getTransactionId() {
-        return txContext.getTransactionId();
+    public AmqpTransactionId getTransactionId() {
+        if (txContext != null && txContext.isInTransaction()) {
+            return txContext.getTransactionId();
+        }
+
+        return null;
     }
 
     AmqpTransactionContext getTransactionContext() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/b4ab0e1a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
index a998290..97089a9 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
@@ -178,4 +178,198 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
         sender.close();
         connection.close();
     }
+
+    @Test(timeout = 60000)
+    public void testMultipleSessionReceiversInSingleTXNWithCommit() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+
+        // Load up the Queue with some messages
+        {
+            AmqpSession session = connection.createSession();
+            AmqpSender sender = session.createSender("queue://" + getTestName());
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message");
+            sender.send(message);
+            sender.send(message);
+            sender.send(message);
+            sender.close();
+        }
+
+        // Root TXN session controls all TXN send lifetimes.
+        AmqpSession txnSession = connection.createSession();
+
+        // Create some sender sessions
+        AmqpSession session1 = connection.createSession();
+        AmqpSession session2 = connection.createSession();
+        AmqpSession session3 = connection.createSession();
+
+        // Sender linked to each session
+        AmqpReceiver receiver1 = session1.createReceiver("queue://" + getTestName());
+        AmqpReceiver receiver2 = session2.createReceiver("queue://" + getTestName());
+        AmqpReceiver receiver3 = session3.createReceiver("queue://" + getTestName());
+
+        final QueueViewMBean queue = getProxyToQueue(getTestName());
+        assertEquals(3, queue.getQueueSize());
+
+        // Begin the transaction that all senders will operate in.
+        txnSession.begin();
+
+        assertTrue(txnSession.isInTransaction());
+
+        receiver1.flow(1);
+        receiver2.flow(1);
+        receiver3.flow(1);
+
+        AmqpMessage message1 = receiver1.receive(5, TimeUnit.SECONDS);
+        AmqpMessage message2 = receiver2.receive(5, TimeUnit.SECONDS);
+        AmqpMessage message3 = receiver3.receive(5, TimeUnit.SECONDS);
+
+        message1.accept(txnSession);
+        message2.accept(txnSession);
+        message3.accept(txnSession);
+
+        assertEquals(3, queue.getQueueSize());
+
+        txnSession.commit();
+
+        assertEquals(0, queue.getQueueSize());
+    }
+
+    @Test(timeout = 60000)
+    public void testMultipleSessionReceiversInSingleTXNWithRollback() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+
+        // Load up the Queue with some messages
+        {
+            AmqpSession session = connection.createSession();
+            AmqpSender sender = session.createSender("queue://" + getTestName());
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message");
+            sender.send(message);
+            sender.send(message);
+            sender.send(message);
+            sender.close();
+        }
+
+        // Root TXN session controls all TXN send lifetimes.
+        AmqpSession txnSession = connection.createSession();
+
+        // Create some sender sessions
+        AmqpSession session1 = connection.createSession();
+        AmqpSession session2 = connection.createSession();
+        AmqpSession session3 = connection.createSession();
+
+        // Sender linked to each session
+        AmqpReceiver receiver1 = session1.createReceiver("queue://" + getTestName());
+        AmqpReceiver receiver2 = session2.createReceiver("queue://" + getTestName());
+        AmqpReceiver receiver3 = session3.createReceiver("queue://" + getTestName());
+
+        final QueueViewMBean queue = getProxyToQueue(getTestName());
+        assertEquals(3, queue.getQueueSize());
+
+        // Begin the transaction that all senders will operate in.
+        txnSession.begin();
+
+        assertTrue(txnSession.isInTransaction());
+
+        receiver1.flow(1);
+        receiver2.flow(1);
+        receiver3.flow(1);
+
+        AmqpMessage message1 = receiver1.receive(5, TimeUnit.SECONDS);
+        AmqpMessage message2 = receiver2.receive(5, TimeUnit.SECONDS);
+        AmqpMessage message3 = receiver3.receive(5, TimeUnit.SECONDS);
+
+        message1.accept(txnSession);
+        message2.accept(txnSession);
+        message3.accept(txnSession);
+
+        assertEquals(3, queue.getQueueSize());
+
+        txnSession.rollback();
+
+        assertEquals(3, queue.getQueueSize());
+    }
+
+    @Test(timeout = 60000)
+    public void testMultipleSessionSendersInSingleTXNWithCommit() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+
+        // Root TXN session controls all TXN send lifetimes.
+        AmqpSession txnSession = connection.createSession();
+
+        // Create some sender sessions
+        AmqpSession session1 = connection.createSession();
+        AmqpSession session2 = connection.createSession();
+        AmqpSession session3 = connection.createSession();
+
+        // Sender linked to each session
+        AmqpSender sender1 = session1.createSender("queue://" + getTestName());
+        AmqpSender sender2 = session2.createSender("queue://" + getTestName());
+        AmqpSender sender3 = session3.createSender("queue://" + getTestName());
+
+        final QueueViewMBean queue = getProxyToQueue(getTestName());
+        assertEquals(0, queue.getQueueSize());
+
+        // Begin the transaction that all senders will operate in.
+        txnSession.begin();
+
+        AmqpMessage message = new AmqpMessage();
+        message.setText("Test-Message");
+
+        assertTrue(txnSession.isInTransaction());
+
+        sender1.send(message, txnSession.getTransactionId());
+        sender2.send(message, txnSession.getTransactionId());
+        sender3.send(message, txnSession.getTransactionId());
+
+        assertEquals(0, queue.getQueueSize());
+
+        txnSession.commit();
+
+        assertEquals(3, queue.getQueueSize());
+    }
+
+    @Test(timeout = 60000)
+    public void testMultipleSessionSendersInSingleTXNWithRollback() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+
+        // Root TXN session controls all TXN send lifetimes.
+        AmqpSession txnSession = connection.createSession();
+
+        // Create some sender sessions
+        AmqpSession session1 = connection.createSession();
+        AmqpSession session2 = connection.createSession();
+        AmqpSession session3 = connection.createSession();
+
+        // Sender linked to each session
+        AmqpSender sender1 = session1.createSender("queue://" + getTestName());
+        AmqpSender sender2 = session2.createSender("queue://" + getTestName());
+        AmqpSender sender3 = session3.createSender("queue://" + getTestName());
+
+        final QueueViewMBean queue = getProxyToQueue(getTestName());
+        assertEquals(0, queue.getQueueSize());
+
+        // Begin the transaction that all senders will operate in.
+        txnSession.begin();
+
+        AmqpMessage message = new AmqpMessage();
+        message.setText("Test-Message");
+
+        assertTrue(txnSession.isInTransaction());
+
+        sender1.send(message, txnSession.getTransactionId());
+        sender2.send(message, txnSession.getTransactionId());
+        sender3.send(message, txnSession.getTransactionId());
+
+        assertEquals(0, queue.getQueueSize());
+
+        txnSession.rollback();
+
+        assertEquals(0, queue.getQueueSize());
+    }
 }