You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/10/14 13:35:39 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6464

Repository: activemq
Updated Branches:
  refs/heads/master 3e237ca73 -> a9f9d4a4d


https://issues.apache.org/jira/browse/AMQ-6464

Correct handling of rejected outcome to archive the message in the DLQ

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a9f9d4a4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a9f9d4a4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a9f9d4a4

Branch: refs/heads/master
Commit: a9f9d4a4d2ede752afa74b22394b80822052543d
Parents: 3e237ca
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Oct 14 09:35:17 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Oct 14 09:35:17 2016 -0400

----------------------------------------------------------------------
 .../activemq/transport/amqp/protocol/AmqpSender.java   |  9 +++++----
 .../transport/amqp/interop/AmqpReceiverTest.java       | 13 +++----------
 2 files changed, 8 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/a9f9d4a4/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
index 149b2e8..23f8597 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
@@ -270,10 +270,11 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
                 }
                 settle(delivery, MessageAck.INDIVIDUAL_ACK_TYPE);
             } else if (state instanceof Rejected) {
-                // re-deliver /w incremented delivery counter.
-                md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
-                LOG.trace("onDelivery: Rejected state = {}, delivery count now {}", state, md.getRedeliveryCounter());
-                settle(delivery, -1);
+                // Rejection is a terminal outcome, we poison the message for dispatch to
+                // the DLQ.  If a custom redelivery policy is used on the broker the message
+                // can still be redelivered based on the configation of that policy.
+                LOG.trace("onDelivery: Rejected state = {}, message poisoned.", state, md.getRedeliveryCounter());
+                settle(delivery, MessageAck.POSION_ACK_TYPE);
             } else if (state instanceof Released) {
                 LOG.trace("onDelivery: Released state = {}", state);
                 // re-deliver && don't increment the counter.

http://git-wip-us.apache.org/repos/asf/activemq/blob/a9f9d4a4/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
index 2a06561..3cda78d 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
@@ -514,16 +514,9 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
 
         message.reject();
 
-        // Read the message again and validate its state
-
-        message = receiver.receive(10, TimeUnit.SECONDS);
-        assertNotNull("did not receive message again", message);
-
-        message.accept();
-
-        protonMessage = message.getWrappedMessage();
-        assertNotNull(protonMessage);
-        assertEquals("Unexpected updated value for AMQP delivery-count", 1, protonMessage.getDeliveryCount());
+        // Attempt to read the message again but should not get it.
+        message = receiver.receive(2, TimeUnit.SECONDS);
+        assertNull("shoudl not receive message again", message);
 
         connection.close();
     }