You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ro...@apache.org on 2015/07/16 13:47:41 UTC
activemq git commit: AMQ-5890: prevent NPE if Modified disposition is
applied without the delivery-failed flag set,
add some general tests of Modified handling
Repository: activemq
Updated Branches:
refs/heads/master c85c7c147 -> 0cfd22591
AMQ-5890: prevent NPE if Modified disposition is applied without the delivery-failed flag set, add some general tests of Modified handling
https://issues.apache.org/jira/browse/AMQ-5890
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0cfd2259
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0cfd2259
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0cfd2259
Branch: refs/heads/master
Commit: 0cfd22591260540301b18a893cd229c59e43dcf6
Parents: c85c7c1
Author: Robert Gemmell <ro...@apache.org>
Authored: Thu Jul 16 12:46:24 2015 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Thu Jul 16 12:46:24 2015 +0100
----------------------------------------------------------------------
.../transport/amqp/protocol/AmqpSender.java | 2 +-
.../transport/amqp/client/AmqpMessage.java | 36 +++--------
.../transport/amqp/client/AmqpReceiver.java | 11 ++--
.../amqp/interop/AmqpReceiverTest.java | 65 ++++++++++++++++++++
4 files changed, 78 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/0cfd2259/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
index 4cbf744..e0e7276 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
@@ -227,7 +227,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
settle(delivery, -1);
} else if (state instanceof Modified) {
Modified modified = (Modified) state;
- if (modified.getDeliveryFailed()) {
+ if (Boolean.TRUE.equals(modified.getDeliveryFailed())) {
// increment delivery counter..
md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/0cfd2259/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index 0acd1c6..e8ad793 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -140,43 +140,21 @@ public class AmqpMessage {
}
/**
- * Rejects the message, marking it as not deliverable here and failed to deliver.
+ * Marks the message as Modified, indicating whether it failed to deliver and is not deliverable here.
*
- * @throws Exception if an error occurs during the reject.
- */
- public void reject() throws Exception {
- reject(true, true);
- }
-
- /**
- * Rejects the message, marking it as failed to deliver and applying the given value
- * to the undeliverable here tag.
- *
- * @param undeliverableHere
- * marks the delivery as not being able to be process by link it was sent to.
- *
- * @throws Exception if an error occurs during the reject.
- */
- public void reject(boolean undeliverableHere) throws Exception {
- reject(undeliverableHere, true);
- }
-
- /**
- * Rejects the message, marking it as not deliverable here and failed to deliver.
- *
- * @param undeliverableHere
- * marks the delivery as not being able to be process by link it was sent to.
* @param deliveryFailed
* indicates that the delivery failed for some reason.
+ * @param undeliverableHere
+ * marks the delivery as not being able to be process by link it was sent to.
*
- * @throws Exception if an error occurs during the reject.
+ * @throws Exception if an error occurs during the process.
*/
- public void reject(boolean undeliverableHere, boolean deliveryFailed) throws Exception {
+ public void modified(Boolean deliveryFailed, Boolean undeliverableHere) throws Exception {
if (receiver == null) {
- throw new IllegalStateException("Can't reject non-received message.");
+ throw new IllegalStateException("Can't modify non-received message.");
}
- receiver.reject(delivery, undeliverableHere, deliveryFailed);
+ receiver.modified(delivery, deliveryFailed, undeliverableHere);
}
/**
http://git-wip-us.apache.org/repos/asf/activemq/blob/0cfd2259/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
index 049fe4d..98241cd 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
@@ -316,18 +316,17 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
}
/**
- * Reject a message that was dispatched under the given Delivery instance.
+ * Mark a message that was dispatched under the given Delivery instance as Modified.
*
* @param delivery
- * the Delivery instance to reject.
- * @param undeliverableHere
- * marks the delivery as not being able to be process by link it was sent to.
+ * the Delivery instance to mark modified.
* @param deliveryFailed
* indicates that the delivery failed for some reason.
- *
+ * @param undeliverableHere
+ * marks the delivery as not being able to be process by link it was sent to.
* @throws IOException if an error occurs while sending the reject.
*/
- public void reject(final Delivery delivery, final boolean undeliverableHere, final boolean deliveryFailed) throws IOException {
+ public void modified(final Delivery delivery, final Boolean deliveryFailed, final Boolean undeliverableHere) throws IOException {
checkClosed();
if (delivery == null) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/0cfd2259/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
index 30571f7..1502bda 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
@@ -21,6 +21,7 @@ import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS
import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.HashMap;
@@ -43,6 +44,7 @@ import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.message.Message;
import org.junit.Test;
/**
@@ -412,4 +414,67 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
connection.getStateInspector().assertValid();
connection.close();
}
+
+ @Test(timeout = 30000)
+ public void testModifiedDispositionWithDeliveryFailedWithoutUndeliverableHereFieldsSet() throws Exception {
+ doModifiedDispositionTestImpl(Boolean.TRUE, null);
+ }
+
+ @Test(timeout = 30000)
+ public void testModifiedDispositionWithoutDeliveryFailedWithoutUndeliverableHereFieldsSet() throws Exception {
+ doModifiedDispositionTestImpl(null, null);
+ }
+
+ @Test(timeout = 30000)
+ public void testModifiedDispositionWithoutDeliveryFailedWithUndeliverableHereFieldsSet() throws Exception {
+ doModifiedDispositionTestImpl(null, Boolean.TRUE);
+ }
+
+ @Test(timeout = 30000)
+ public void testModifiedDispositionWithDeliveryFailedWithUndeliverableHereFieldsSet() throws Exception {
+ doModifiedDispositionTestImpl(Boolean.TRUE, Boolean.TRUE);
+ }
+
+ private void doModifiedDispositionTestImpl(Boolean deliveryFailed, Boolean undeliverableHere) throws Exception {
+ int msgCount = 1;
+ sendMessages(getTestName(), msgCount, false);
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = client.connect();
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+ receiver.flow(2 * msgCount);
+
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull("did not receive message first time", message);
+
+ Message protonMessage = message.getWrappedMessage();
+ assertNotNull(protonMessage);
+ assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount());
+
+ message.modified(deliveryFailed, undeliverableHere);
+
+ if(Boolean.TRUE.equals(undeliverableHere)) {
+ message = receiver.receive(250, TimeUnit.MILLISECONDS);
+ assertNull("Should not receive message again", message);
+ } else {
+ message = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull("did not receive message again", message);
+
+ int expectedDeliveryCount = 0;
+ if(Boolean.TRUE.equals(deliveryFailed)) {
+ expectedDeliveryCount = 1;
+ }
+
+ message.accept();
+
+ Message protonMessage2 = message.getWrappedMessage();
+ assertNotNull(protonMessage2);
+ assertEquals("Unexpected updated value for AMQP delivery-count", expectedDeliveryCount, protonMessage2.getDeliveryCount());
+ }
+
+ receiver.close();
+ connection.close();
+ }
}