You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/10/11 22:59:45 UTC
activemq git commit: NO-JIRA Add some additional testing around
outcomes (cherry picked from commit f71e0ee15b2c456dac28cf456deafb764221afa6)
Repository: activemq
Updated Branches:
refs/heads/activemq-5.14.x c42f81514 -> 330deb2c8
NO-JIRA Add some additional testing around outcomes
(cherry picked from commit f71e0ee15b2c456dac28cf456deafb764221afa6)
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/330deb2c
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/330deb2c
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/330deb2c
Branch: refs/heads/activemq-5.14.x
Commit: 330deb2c898fef020f94d5599b3df12294c398ea
Parents: c42f815
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Oct 11 18:58:51 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Oct 11 18:59:41 2016 -0400
----------------------------------------------------------------------
.../transport/amqp/client/AmqpMessage.java | 15 ++++-
.../transport/amqp/client/AmqpReceiver.java | 37 +++++++++++
.../amqp/interop/AmqpReceiverTest.java | 68 ++++++++++++++++++++
3 files changed, 119 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/330deb2c/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index 952f98d..d28ac8e 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -201,7 +201,7 @@ public class AmqpMessage {
/**
* Release the message, remote can redeliver it elsewhere.
*
- * @throws Exception if an error occurs during the reject.
+ * @throws Exception if an error occurs during the release.
*/
public void release() throws Exception {
if (receiver == null) {
@@ -211,6 +211,19 @@ public class AmqpMessage {
receiver.release(delivery);
}
+ /**
+ * Reject the message, remote can redeliver it elsewhere.
+ *
+ * @throws Exception if an error occurs during the reject.
+ */
+ public void reject() throws Exception {
+ if (receiver == null) {
+ throw new IllegalStateException("Can't release non-received message.");
+ }
+
+ receiver.reject(delivery);
+ }
+
//----- Convenience methods for constructing outbound messages -----------//
/**
http://git-wip-us.apache.org/repos/asf/activemq/blob/330deb2c/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
index 3543ae3..d2b859a 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
@@ -605,6 +605,43 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
}
/**
+ * Reject a message that was dispatched under the given Delivery instance.
+ *
+ * @param delivery
+ * the Delivery instance to reject.
+ *
+ * @throws IOException if an error occurs while sending the release.
+ */
+ public void reject(final Delivery delivery) throws IOException {
+ checkClosed();
+
+ if (delivery == null) {
+ throw new IllegalArgumentException("Delivery to release cannot be null");
+ }
+
+ final ClientFuture request = new ClientFuture();
+ session.getScheduler().execute(new Runnable() {
+
+ @Override
+ public void run() {
+ checkClosed();
+ try {
+ if (!delivery.isSettled()) {
+ delivery.disposition(new Rejected());
+ delivery.settle();
+ session.pumpToProtonTransport(request);
+ }
+ request.onSuccess();
+ } catch (Exception e) {
+ request.onFailure(e);
+ }
+ }
+ });
+
+ request.sync();
+ }
+
+ /**
* @return an unmodifiable view of the underlying Receiver instance.
*/
public Receiver getReceiver() {
http://git-wip-us.apache.org/repos/asf/activemq/blob/330deb2c/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
index 0b9a379..2a06561 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
@@ -461,6 +461,74 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
}
@Test(timeout = 30000)
+ public void testReleasedDisposition() throws Exception {
+ sendMessages(getTestName(), 1, false);
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = trackConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver = session.createReceiver(getTestName());
+ receiver.flow(2);
+
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull("did not receive message first time", message);
+
+ Message protonMessage = message.getWrappedMessage();
+ assertNotNull(protonMessage);
+ assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount());
+
+ message.release();
+
+ // Read the message again and validate its state
+
+ message = receiver.receive(10, TimeUnit.SECONDS);
+ assertNotNull("did not receive message again", message);
+
+ message.accept();
+
+ protonMessage = message.getWrappedMessage();
+ assertNotNull(protonMessage);
+ assertEquals("Unexpected updated value for AMQP delivery-count", 0, protonMessage.getDeliveryCount());
+
+ connection.close();
+ }
+
+ @Test(timeout = 30000)
+ public void testRejectedDisposition() throws Exception {
+ sendMessages(getTestName(), 1, false);
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = trackConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver = session.createReceiver(getTestName());
+ receiver.flow(2);
+
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull("did not receive message first time", message);
+
+ Message protonMessage = message.getWrappedMessage();
+ assertNotNull(protonMessage);
+ assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount());
+
+ message.reject();
+
+ // Read the message again and validate its state
+
+ message = receiver.receive(10, TimeUnit.SECONDS);
+ assertNotNull("did not receive message again", message);
+
+ message.accept();
+
+ protonMessage = message.getWrappedMessage();
+ assertNotNull(protonMessage);
+ assertEquals("Unexpected updated value for AMQP delivery-count", 1, protonMessage.getDeliveryCount());
+
+ connection.close();
+ }
+
+ @Test(timeout = 30000)
public void testModifiedDispositionWithDeliveryFailedWithoutUndeliverableHereFieldsSet() throws Exception {
doModifiedDispositionTestImpl(Boolean.TRUE, null);
}