You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2017/04/04 20:27:33 UTC

qpid-jms git commit: QPIDJMS-274 Allow for RedeliveryPolicy to provide an outcome

Repository: qpid-jms
Updated Branches:
  refs/heads/master adbdb85f8 -> f096ac071


QPIDJMS-274 Allow for RedeliveryPolicy to provide an outcome

Allow the redelivery policy to provide the outcome that should
be applied to a message that has exceeded the configured max
redeliveries value.


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/f096ac07
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/f096ac07
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/f096ac07

Branch: refs/heads/master
Commit: f096ac07180880c724ba89a056520dd3a0d7308a
Parents: adbdb85
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Apr 4 16:21:14 2017 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Apr 4 16:25:06 2017 -0400

----------------------------------------------------------------------
 .../apache/qpid/jms/JmsAcknowledgeCallback.java |  21 +---
 .../org/apache/qpid/jms/JmsMessageConsumer.java |  13 ++-
 .../qpid/jms/message/JmsMessageSupport.java     |  20 ++++
 .../jms/policy/JmsDefaultRedeliveryPolicy.java  |  40 +++++--
 .../qpid/jms/policy/JmsRedeliveryPolicy.java    |  26 ++++-
 .../qpid/jms/provider/ProviderConstants.java    |   3 +-
 .../qpid/jms/provider/amqp/AmqpConsumer.java    |  15 +--
 .../ConnectionFactoryIntegrationTest.java       |   5 +
 .../integration/ConsumerIntegrationTest.java    | 105 +++++++++++++++++++
 9 files changed, 207 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f096ac07/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsAcknowledgeCallback.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsAcknowledgeCallback.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsAcknowledgeCallback.java
index cc60b56..7af2ca7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsAcknowledgeCallback.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsAcknowledgeCallback.java
@@ -17,12 +17,10 @@
 package org.apache.qpid.jms;
 
 import static org.apache.qpid.jms.message.JmsMessageSupport.ACCEPTED;
+import static org.apache.qpid.jms.message.JmsMessageSupport.lookupAckTypeForDisposition;
 
 import javax.jms.JMSException;
 
-import org.apache.qpid.jms.message.JmsMessageSupport;
-import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
-
 public final class JmsAcknowledgeCallback {
 
     private final JmsSession session;
@@ -40,23 +38,6 @@ public final class JmsAcknowledgeCallback {
         session.acknowledge(lookupAckTypeForDisposition(getAckType()));
     }
 
-    private ACK_TYPE lookupAckTypeForDisposition(int dispositionType) throws JMSException {
-        switch (dispositionType) {
-            case JmsMessageSupport.ACCEPTED:
-                return ACK_TYPE.ACCEPTED;
-            case JmsMessageSupport.REJECTED:
-                return ACK_TYPE.REJECTED;
-            case JmsMessageSupport.RELEASED:
-                return ACK_TYPE.RELEASED;
-            case JmsMessageSupport.MODIFIED_FAILED:
-                return ACK_TYPE.MODIFIED_FAILED;
-            case JmsMessageSupport.MODIFIED_FAILED_UNDELIVERABLE:
-                return ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE;
-            default:
-                throw new JMSException("Unable to determine ack type for disposition: " + dispositionType);
-        }
-    }
-
     /**
      * @return true if the acknowledgement type was updated.
      */

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f096ac07/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index 29446af..b9170e0 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.qpid.jms;
 
+import static org.apache.qpid.jms.message.JmsMessageSupport.lookupAckTypeForDisposition;
+
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
@@ -323,7 +325,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
                     performPullIfRequired(timeout, false);
                 } else if (redeliveryExceeded(envelope)) {
                     LOG.debug("{} filtered message with excessive redelivery count: {}", getConsumerId(), envelope);
-                    doAckUndeliverable(envelope);
+                    applyRedeliveryPolicyOutcome(envelope);
                     if (timeout > 0) {
                         timeout = Math.max(deadline - System.currentTimeMillis(), 0);
                     }
@@ -428,16 +430,17 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
 
     private void doAckExpired(final JmsInboundMessageDispatch envelope) throws JMSException {
         try {
-            session.acknowledge(envelope, ACK_TYPE.EXPIRED);
+            session.acknowledge(envelope, ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE);
         } catch (JMSException ex) {
             session.onException(ex);
             throw ex;
         }
     }
 
-    private void doAckUndeliverable(final JmsInboundMessageDispatch envelope) throws JMSException {
+    private void applyRedeliveryPolicyOutcome(final JmsInboundMessageDispatch envelope) throws JMSException {
         try {
-            session.acknowledge(envelope, ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE);
+            JmsRedeliveryPolicy redeliveryPolicy = consumerInfo.getRedeliveryPolicy();
+            session.acknowledge(envelope, lookupAckTypeForDisposition(redeliveryPolicy.getOutcome(getDestination())));
         } catch (JMSException ex) {
             session.onException(ex);
             throw ex;
@@ -712,7 +715,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
                     doAckExpired(envelope);
                 } else if (redeliveryExceeded(envelope)) {
                     LOG.trace("{} filtered message with excessive redelivery count: {}", getConsumerId(), envelope);
-                    doAckUndeliverable(envelope);
+                    applyRedeliveryPolicyOutcome(envelope);
                 } else {
                     boolean deliveryFailed = false;
                     boolean autoAckOrDupsOk = acknowledgementMode == Session.AUTO_ACKNOWLEDGE ||

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f096ac07/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageSupport.java
index 657542c..69924c3 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageSupport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageSupport.java
@@ -16,6 +16,10 @@
  */
 package org.apache.qpid.jms.message;
 
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
+
 /**
  * Set of common utilities and definitions useful for JMS Message handling.
  */
@@ -48,4 +52,20 @@ public class JmsMessageSupport {
     public static final int MODIFIED_FAILED = 4;
     public static final int MODIFIED_FAILED_UNDELIVERABLE = 5;
 
+    public static ACK_TYPE lookupAckTypeForDisposition(int dispositionType) throws JMSException {
+        switch (dispositionType) {
+            case JmsMessageSupport.ACCEPTED:
+                return ACK_TYPE.ACCEPTED;
+            case JmsMessageSupport.REJECTED:
+                return ACK_TYPE.REJECTED;
+            case JmsMessageSupport.RELEASED:
+                return ACK_TYPE.RELEASED;
+            case JmsMessageSupport.MODIFIED_FAILED:
+                return ACK_TYPE.MODIFIED_FAILED;
+            case JmsMessageSupport.MODIFIED_FAILED_UNDELIVERABLE:
+                return ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE;
+            default:
+                throw new JMSException("Unable to determine ack type for disposition: " + dispositionType);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f096ac07/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultRedeliveryPolicy.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultRedeliveryPolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultRedeliveryPolicy.java
index e9869b3..9216fd8 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultRedeliveryPolicy.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultRedeliveryPolicy.java
@@ -17,6 +17,7 @@
 package org.apache.qpid.jms.policy;
 
 import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.message.JmsMessageSupport;
 
 /**
  * Defines the policy used to manage redelivered and recovered Messages.
@@ -24,15 +25,19 @@ import org.apache.qpid.jms.JmsDestination;
 public class JmsDefaultRedeliveryPolicy implements JmsRedeliveryPolicy {
 
     public static final int DEFAULT_MAX_REDELIVERIES = -1;
+    public static final int DEFAULT_OUTCOME = JmsMessageSupport.MODIFIED_FAILED_UNDELIVERABLE;
 
     private int maxRedeliveries;
+    private int outcome;
 
     public JmsDefaultRedeliveryPolicy() {
         maxRedeliveries = DEFAULT_MAX_REDELIVERIES;
+        outcome = DEFAULT_OUTCOME;
     }
 
     public JmsDefaultRedeliveryPolicy(JmsDefaultRedeliveryPolicy source) {
         maxRedeliveries = source.maxRedeliveries;
+        outcome = source.outcome;
     }
 
     @Override
@@ -45,12 +50,37 @@ public class JmsDefaultRedeliveryPolicy implements JmsRedeliveryPolicy {
         return maxRedeliveries;
     }
 
+    @Override
+    public int getOutcome(JmsDestination destination) {
+        return outcome;
+    }
+
+    /**
+     * Returns the configured default outcome that will be used when rejecting messages.
+     * <p>
+     * Default acknowledgement type is Modified with Undeliverable here set to true.
+     *
+     * @return the default outcome used when rejecting messages.
+     */
+    public int getOutcome() {
+        return outcome;
+    }
+
+    /**
+     * Set the default outcome to use when rejecting messages.
+     *
+     * @param outcome
+     * 		the default outcome applied to a rejected delivery.
+     */
+    public void setOutcome(int outcome) {
+        this.outcome = outcome;
+    }
+
     /**
      * Returns the configured maximum redeliveries that a message will be
      * allowed to have before it is rejected by this client.
      *
-     * @return the maxRedeliveries
-     *         the maximum number of redeliveries allowed before a message is rejected.
+     * @return the maximum number of redeliveries allowed before a message is rejected.
      */
     public int getMaxRedeliveries() {
         return maxRedeliveries;
@@ -73,6 +103,7 @@ public class JmsDefaultRedeliveryPolicy implements JmsRedeliveryPolicy {
         final int prime = 31;
         int result = 1;
         result = prime * result + maxRedeliveries;
+        result = prime * result + outcome;
         return result;
     }
 
@@ -91,10 +122,7 @@ public class JmsDefaultRedeliveryPolicy implements JmsRedeliveryPolicy {
         }
 
         JmsDefaultRedeliveryPolicy other = (JmsDefaultRedeliveryPolicy) obj;
-        if (maxRedeliveries != other.maxRedeliveries) {
-            return false;
-        }
 
-        return true;
+        return maxRedeliveries == other.maxRedeliveries && outcome == other.outcome;
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f096ac07/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsRedeliveryPolicy.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsRedeliveryPolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsRedeliveryPolicy.java
index 24f0eec..55f2097 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsRedeliveryPolicy.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsRedeliveryPolicy.java
@@ -36,9 +36,31 @@ public interface JmsRedeliveryPolicy {
      * @param destination
      *      the destination that the subscription is redelivering from.
      *
-     * @return the maxRedeliveries
-     *         the maximum number of redeliveries allowed before a message is rejected.
+     * @return the maximum number of redeliveries allowed before a message is rejected.
      */
     int getMaxRedeliveries(JmsDestination destination);
 
+    /**
+     * Returns the configured outcome that will be used when rejecting the
+     * message by this client for the given destination when the message has
+     * reached the maximum redelivery threshold.
+     *
+     * The outcome returned here maps to AMQP outcomes using the following
+     * integer values:
+     *
+     * <p><ul>
+     *  <li>ACCEPTED = 1
+     *  <li>REJECTED = 2
+     *  <li>RELEASED = 3
+     *  <li>MODIFIED_FAILED = 4
+     *  <li>MODIFIED_FAILED_UNDELIVERABLE = 5
+     * </ul><p>
+     *
+     * @param destination
+     *      the destination that the subscription is redelivering from.
+     *
+     * @return the outcome to use when rejecting messages.
+     */
+    int getOutcome(JmsDestination destination);
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f096ac07/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java
index ddd157c..d00f35f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java
@@ -31,7 +31,6 @@ public final class ProviderConstants {
         MODIFIED_FAILED,
         MODIFIED_FAILED_UNDELIVERABLE,
         // Conceptual
-        DELIVERED,
-        EXPIRED;
+        DELIVERED
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f096ac07/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 99e9057..515defa 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -43,6 +43,7 @@ import org.apache.qpid.jms.util.IOExceptionSupport;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.Released;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Receiver;
 import org.slf4j.Logger;
@@ -304,10 +305,12 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
             } else {
                 delivery.settle();
             }
+        } else if (ackType.equals(ACK_TYPE.MODIFIED_FAILED)) {
+            settleDelivery(delivery, MODIFIED_FAILED);
         } else if (ackType.equals(ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE)) {
-            deliveryFailedUndeliverable(delivery);
-        } else if (ackType.equals(ACK_TYPE.EXPIRED)) {
-            deliveryFailedUndeliverable(delivery);
+            settleDelivery(delivery, MODIFIED_FAILED_UNDELIVERABLE);
+        } else if (ackType.equals(ACK_TYPE.REJECTED)) {
+            settleDelivery(delivery, REJECTED);
         } else if (ackType.equals(ACK_TYPE.RELEASED)) {
             delivery.disposition(Released.getInstance());
             delivery.settle();
@@ -489,7 +492,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
             //        In the future once the JMS mapping is complete we should be
             //        able to convert everything to some message even if its just
             //        a bytes messages as a fall back.
-            deliveryFailedUndeliverable(incoming);
+            settleDelivery(incoming, MODIFIED_FAILED_UNDELIVERABLE);
             return false;
         }
 
@@ -559,8 +562,8 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
         return "AmqpConsumer { " + getResourceInfo().getId() + " }";
     }
 
-    protected void deliveryFailedUndeliverable(Delivery incoming) {
-        incoming.disposition(MODIFIED_FAILED_UNDELIVERABLE);
+    protected void settleDelivery(Delivery incoming, DeliveryState state) {
+        incoming.disposition(state);
         incoming.settle();
         // TODO: this flows credit, which we might not want, e.g if
         // a drain was issued to stop the link.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f096ac07/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionFactoryIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionFactoryIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionFactoryIntegrationTest.java
index 3897186..d3331d2 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionFactoryIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionFactoryIntegrationTest.java
@@ -548,5 +548,10 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
         public int getMaxRedeliveries(JmsDestination destination) {
             return JmsDefaultRedeliveryPolicy.DEFAULT_MAX_REDELIVERIES;
         }
+
+        @Override
+        public int getOutcome(JmsDestination destination) {
+            return JmsDefaultRedeliveryPolicy.DEFAULT_OUTCOME;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f096ac07/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index a748e1a..5275487 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -58,7 +58,10 @@ import org.apache.qpid.jms.test.testpeer.AmqpPeerRunnable;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
 import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.HeaderDescribedType;
 import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.RejectedMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.ReleasedMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
@@ -67,6 +70,7 @@ import org.apache.qpid.jms.util.QpidJMSTestRunner;
 import org.apache.qpid.jms.util.Repeat;
 import org.apache.qpid.proton.amqp.DescribedType;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.hamcrest.Matcher;
 import org.hamcrest.Matchers;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -1628,4 +1632,105 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(3000);
         }
     }
+
+    @Test(timeout=20000)
+    public void testRedeliveryPolicyOutcomeAppliedAccepted() throws Exception {
+        doTestRedeliveryPolicyOutcomeApplied(1);
+    }
+
+    @Test(timeout=20000)
+    public void testRedeliveryPolicyOutcomeAppliedRejected() throws Exception {
+        doTestRedeliveryPolicyOutcomeApplied(2);
+    }
+
+    @Test(timeout=20000)
+    public void testRedeliveryPolicyOutcomeAppliedReleased() throws Exception {
+        doTestRedeliveryPolicyOutcomeApplied(3);
+    }
+
+    @Test(timeout=20000)
+    public void testRedeliveryPolicyOutcomeAppliedModifiedFailed() throws Exception {
+        doTestRedeliveryPolicyOutcomeApplied(4);
+    }
+
+    @Test(timeout=20000)
+    public void testRedeliveryPolicyOutcomeAppliedModifiedFailedUndeliverable() throws Exception {
+        doTestRedeliveryPolicyOutcomeApplied(5);
+    }
+
+    private void doTestRedeliveryPolicyOutcomeApplied(int outcome) throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+            Connection connection = testFixture.establishConnecton(testPeer,
+                "?jms.redeliveryPolicy.maxRedeliveries=1&jms.redeliveryPolicy.outcome=" + outcome);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            HeaderDescribedType header = new HeaderDescribedType();
+            header.setDeliveryCount(new UnsignedInteger(2));
+
+            testPeer.expectReceiverAttach();
+            // Send some messages that have exceeded the specified re-delivery count
+            testPeer.expectLinkFlowRespondWithTransfer(header, null, null, null, new AmqpValueDescribedType("redelivered-content"), 1);
+            // Send a message that has not exceeded the delivery count
+            String expectedContent = "not-redelivered";
+            testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, null, null, null, new AmqpValueDescribedType(expectedContent), 2);
+
+            Matcher<?> outcomeMatcher = null;
+
+            // Expect a disposition matching the given outcome index:
+            //
+            //  ACCEPTED = 1
+            //  REJECTED = 2
+            //  RELEASED = 3
+            //  MODIFIED_FAILED = 4
+            //  MODIFIED_FAILED_UNDELIVERABLE = 5
+            switch (outcome) {
+                case 1:
+                    outcomeMatcher = new AcceptedMatcher();
+                    break;
+                case 2:
+                    outcomeMatcher = new RejectedMatcher();
+                    break;
+                case 3:
+                    outcomeMatcher = new ReleasedMatcher();
+                    break;
+                case 4:
+                    ModifiedMatcher failed = new ModifiedMatcher();
+                    failed.withDeliveryFailed(equalTo(true));
+                    outcomeMatcher = failed;
+                    break;
+                case 5:
+                    ModifiedMatcher undeliverable = new ModifiedMatcher();
+                    undeliverable.withDeliveryFailed(equalTo(true));
+                    undeliverable.withUndeliverableHere(equalTo(true));
+                    outcomeMatcher = undeliverable;
+                    break;
+                default:
+                    throw new IllegalArgumentException("Test passed invalid outcome value");
+            }
+
+            // Expect a settled disposition matching the configured redelivery policy outcome
+            testPeer.expectDisposition(true, outcomeMatcher);
+
+            // Then expect an Accepted disposition for the good message
+            testPeer.expectDisposition(true, new AcceptedMatcher());
+
+            final MessageConsumer consumer = session.createConsumer(queue);
+
+            Message m = consumer.receive(6000);
+            assertNotNull("Should have reiceved the final message", m);
+            assertTrue("Should have received the final message", m instanceof TextMessage);
+            assertEquals("Unexpected content", expectedContent, ((TextMessage)m).getText());
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org