You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ro...@apache.org on 2015/07/16 13:47:41 UTC

activemq git commit: AMQ-5890: prevent NPE if Modified disposition is applied without the delivery-failed flag set, add some general tests of Modified handling

Repository: activemq
Updated Branches:
  refs/heads/master c85c7c147 -> 0cfd22591


AMQ-5890: prevent NPE if Modified disposition is applied without the delivery-failed flag set, add some general tests of Modified handling

https://issues.apache.org/jira/browse/AMQ-5890


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0cfd2259
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0cfd2259
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0cfd2259

Branch: refs/heads/master
Commit: 0cfd22591260540301b18a893cd229c59e43dcf6
Parents: c85c7c1
Author: Robert Gemmell <ro...@apache.org>
Authored: Thu Jul 16 12:46:24 2015 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Thu Jul 16 12:46:24 2015 +0100

----------------------------------------------------------------------
 .../transport/amqp/protocol/AmqpSender.java     |  2 +-
 .../transport/amqp/client/AmqpMessage.java      | 36 +++--------
 .../transport/amqp/client/AmqpReceiver.java     | 11 ++--
 .../amqp/interop/AmqpReceiverTest.java          | 65 ++++++++++++++++++++
 4 files changed, 78 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/0cfd2259/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
index 4cbf744..e0e7276 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
@@ -227,7 +227,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
                 settle(delivery, -1);
             } else if (state instanceof Modified) {
                 Modified modified = (Modified) state;
-                if (modified.getDeliveryFailed()) {
+                if (Boolean.TRUE.equals(modified.getDeliveryFailed())) {
                     // increment delivery counter..
                     md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
                 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/0cfd2259/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index 0acd1c6..e8ad793 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -140,43 +140,21 @@ public class AmqpMessage {
     }
 
     /**
-     * Rejects the message, marking it as not deliverable here and failed to deliver.
+     * Marks the message as Modified, indicating whether it failed to deliver and is not deliverable here.
      *
-     * @throws Exception if an error occurs during the reject.
-     */
-    public void reject() throws Exception {
-        reject(true, true);
-    }
-
-    /**
-     * Rejects the message, marking it as failed to deliver and applying the given value
-     * to the undeliverable here tag.
-     *
-     * @param undeliverableHere
-     *        marks the delivery as not being able to be process by link it was sent to.
-     *
-     * @throws Exception if an error occurs during the reject.
-     */
-    public void reject(boolean undeliverableHere) throws Exception {
-        reject(undeliverableHere, true);
-    }
-
-    /**
-     * Rejects the message, marking it as not deliverable here and failed to deliver.
-     *
-     * @param undeliverableHere
-     *        marks the delivery as not being able to be process by link it was sent to.
      * @param deliveryFailed
      *        indicates that the delivery failed for some reason.
+     * @param undeliverableHere
+     *        marks the delivery as not being able to be process by link it was sent to.
      *
-     * @throws Exception if an error occurs during the reject.
+     * @throws Exception if an error occurs during the process.
      */
-    public void reject(boolean undeliverableHere, boolean deliveryFailed) throws Exception {
+    public void modified(Boolean deliveryFailed, Boolean undeliverableHere) throws Exception {
         if (receiver == null) {
-            throw new IllegalStateException("Can't reject non-received message.");
+            throw new IllegalStateException("Can't modify non-received message.");
         }
 
-        receiver.reject(delivery, undeliverableHere, deliveryFailed);
+        receiver.modified(delivery, deliveryFailed, undeliverableHere);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/activemq/blob/0cfd2259/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
index 049fe4d..98241cd 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
@@ -316,18 +316,17 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
     }
 
     /**
-     * Reject a message that was dispatched under the given Delivery instance.
+     * Mark a message that was dispatched under the given Delivery instance as Modified.
      *
      * @param delivery
-     *        the Delivery instance to reject.
-     * @param undeliverableHere
-     *        marks the delivery as not being able to be process by link it was sent to.
+     *        the Delivery instance to mark modified.
      * @param deliveryFailed
      *        indicates that the delivery failed for some reason.
-     *
+     * @param undeliverableHere
+     *        marks the delivery as not being able to be process by link it was sent to.
      * @throws IOException if an error occurs while sending the reject.
      */
-    public void reject(final Delivery delivery, final boolean undeliverableHere, final boolean deliveryFailed) throws IOException {
+    public void modified(final Delivery delivery, final Boolean deliveryFailed, final Boolean undeliverableHere) throws IOException {
         checkClosed();
 
         if (delivery == null) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/0cfd2259/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
index 30571f7..1502bda 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
@@ -21,6 +21,7 @@ import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS
 import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.util.HashMap;
@@ -43,6 +44,7 @@ import org.apache.qpid.proton.amqp.messaging.Source;
 import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
 import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.message.Message;
 import org.junit.Test;
 
 /**
@@ -412,4 +414,67 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
         connection.getStateInspector().assertValid();
         connection.close();
     }
+
+    @Test(timeout = 30000)
+    public void testModifiedDispositionWithDeliveryFailedWithoutUndeliverableHereFieldsSet() throws Exception {
+        doModifiedDispositionTestImpl(Boolean.TRUE, null);
+    }
+
+    @Test(timeout = 30000)
+    public void testModifiedDispositionWithoutDeliveryFailedWithoutUndeliverableHereFieldsSet() throws Exception {
+        doModifiedDispositionTestImpl(null, null);
+    }
+
+    @Test(timeout = 30000)
+    public void testModifiedDispositionWithoutDeliveryFailedWithUndeliverableHereFieldsSet() throws Exception {
+        doModifiedDispositionTestImpl(null, Boolean.TRUE);
+    }
+
+    @Test(timeout = 30000)
+    public void testModifiedDispositionWithDeliveryFailedWithUndeliverableHereFieldsSet() throws Exception {
+        doModifiedDispositionTestImpl(Boolean.TRUE, Boolean.TRUE);
+    }
+
+    private void doModifiedDispositionTestImpl(Boolean deliveryFailed, Boolean undeliverableHere) throws Exception {
+        int msgCount = 1;
+        sendMessages(getTestName(), msgCount, false);
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+        receiver.flow(2 * msgCount);
+
+        AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+        assertNotNull("did not receive message first time", message);
+
+        Message protonMessage = message.getWrappedMessage();
+        assertNotNull(protonMessage);
+        assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount());
+
+        message.modified(deliveryFailed, undeliverableHere);
+
+        if(Boolean.TRUE.equals(undeliverableHere)) {
+            message = receiver.receive(250, TimeUnit.MILLISECONDS);
+            assertNull("Should not receive message again", message);
+        } else {
+            message = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull("did not receive message again", message);
+
+            int expectedDeliveryCount = 0;
+            if(Boolean.TRUE.equals(deliveryFailed)) {
+                expectedDeliveryCount = 1;
+            }
+
+            message.accept();
+
+            Message protonMessage2 = message.getWrappedMessage();
+            assertNotNull(protonMessage2);
+            assertEquals("Unexpected updated value for AMQP delivery-count", expectedDeliveryCount, protonMessage2.getDeliveryCount());
+        }
+
+        receiver.close();
+        connection.close();
+    }
 }