You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/10/11 22:59:45 UTC

activemq git commit: NO-JIRA Add some additional testing around outcomes (cherry picked from commit f71e0ee15b2c456dac28cf456deafb764221afa6)

Repository: activemq
Updated Branches:
  refs/heads/activemq-5.14.x c42f81514 -> 330deb2c8


NO-JIRA Add some additional testing around outcomes
(cherry picked from commit f71e0ee15b2c456dac28cf456deafb764221afa6)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/330deb2c
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/330deb2c
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/330deb2c

Branch: refs/heads/activemq-5.14.x
Commit: 330deb2c898fef020f94d5599b3df12294c398ea
Parents: c42f815
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Oct 11 18:58:51 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Oct 11 18:59:41 2016 -0400

----------------------------------------------------------------------
 .../transport/amqp/client/AmqpMessage.java      | 15 ++++-
 .../transport/amqp/client/AmqpReceiver.java     | 37 +++++++++++
 .../amqp/interop/AmqpReceiverTest.java          | 68 ++++++++++++++++++++
 3 files changed, 119 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/330deb2c/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index 952f98d..d28ac8e 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -201,7 +201,7 @@ public class AmqpMessage {
     /**
      * Release the message, remote can redeliver it elsewhere.
      *
-     * @throws Exception if an error occurs during the reject.
+     * @throws Exception if an error occurs during the release.
      */
     public void release() throws Exception {
         if (receiver == null) {
@@ -211,6 +211,19 @@ public class AmqpMessage {
         receiver.release(delivery);
     }
 
+    /**
+     * Reject the message, remote can redeliver it elsewhere.
+     *
+     * @throws Exception if an error occurs during the reject.
+     */
+    public void reject() throws Exception {
+        if (receiver == null) {
+            throw new IllegalStateException("Can't release non-received message.");
+        }
+
+        receiver.reject(delivery);
+    }
+
     //----- Convenience methods for constructing outbound messages -----------//
 
     /**

http://git-wip-us.apache.org/repos/asf/activemq/blob/330deb2c/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
index 3543ae3..d2b859a 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
@@ -605,6 +605,43 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
     }
 
     /**
+     * Reject a message that was dispatched under the given Delivery instance.
+     *
+     * @param delivery
+     *        the Delivery instance to reject.
+     *
+     * @throws IOException if an error occurs while sending the release.
+     */
+    public void reject(final Delivery delivery) throws IOException {
+        checkClosed();
+
+        if (delivery == null) {
+            throw new IllegalArgumentException("Delivery to release cannot be null");
+        }
+
+        final ClientFuture request = new ClientFuture();
+        session.getScheduler().execute(new Runnable() {
+
+            @Override
+            public void run() {
+                checkClosed();
+                try {
+                    if (!delivery.isSettled()) {
+                        delivery.disposition(new Rejected());
+                        delivery.settle();
+                        session.pumpToProtonTransport(request);
+                    }
+                    request.onSuccess();
+                } catch (Exception e) {
+                    request.onFailure(e);
+                }
+            }
+        });
+
+        request.sync();
+    }
+
+    /**
      * @return an unmodifiable view of the underlying Receiver instance.
      */
     public Receiver getReceiver() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/330deb2c/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
index 0b9a379..2a06561 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
@@ -461,6 +461,74 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
     }
 
     @Test(timeout = 30000)
+    public void testReleasedDisposition() throws Exception {
+        sendMessages(getTestName(), 1, false);
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = trackConnection(client.connect());
+        AmqpSession session = connection.createSession();
+
+        AmqpReceiver receiver = session.createReceiver(getTestName());
+        receiver.flow(2);
+
+        AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+        assertNotNull("did not receive message first time", message);
+
+        Message protonMessage = message.getWrappedMessage();
+        assertNotNull(protonMessage);
+        assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount());
+
+        message.release();
+
+        // Read the message again and validate its state
+
+        message = receiver.receive(10, TimeUnit.SECONDS);
+        assertNotNull("did not receive message again", message);
+
+        message.accept();
+
+        protonMessage = message.getWrappedMessage();
+        assertNotNull(protonMessage);
+        assertEquals("Unexpected updated value for AMQP delivery-count", 0, protonMessage.getDeliveryCount());
+
+        connection.close();
+    }
+
+    @Test(timeout = 30000)
+    public void testRejectedDisposition() throws Exception {
+        sendMessages(getTestName(), 1, false);
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = trackConnection(client.connect());
+        AmqpSession session = connection.createSession();
+
+        AmqpReceiver receiver = session.createReceiver(getTestName());
+        receiver.flow(2);
+
+        AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+        assertNotNull("did not receive message first time", message);
+
+        Message protonMessage = message.getWrappedMessage();
+        assertNotNull(protonMessage);
+        assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount());
+
+        message.reject();
+
+        // Read the message again and validate its state
+
+        message = receiver.receive(10, TimeUnit.SECONDS);
+        assertNotNull("did not receive message again", message);
+
+        message.accept();
+
+        protonMessage = message.getWrappedMessage();
+        assertNotNull(protonMessage);
+        assertEquals("Unexpected updated value for AMQP delivery-count", 1, protonMessage.getDeliveryCount());
+
+        connection.close();
+    }
+
+    @Test(timeout = 30000)
     public void testModifiedDispositionWithDeliveryFailedWithoutUndeliverableHereFieldsSet() throws Exception {
         doModifiedDispositionTestImpl(Boolean.TRUE, null);
     }