You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2017/04/04 20:27:33 UTC
qpid-jms git commit: QPIDJMS-274 Allow for RedeliveryPolicy to
provide an outcome
Repository: qpid-jms
Updated Branches:
refs/heads/master adbdb85f8 -> f096ac071
QPIDJMS-274 Allow for RedeliveryPolicy to provide an outcome
Allow the redelivery policy to provide the outcome that should
be applied to a message that has exceeded the configured max
redeliveries value.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/f096ac07
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/f096ac07
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/f096ac07
Branch: refs/heads/master
Commit: f096ac07180880c724ba89a056520dd3a0d7308a
Parents: adbdb85
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Apr 4 16:21:14 2017 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Apr 4 16:25:06 2017 -0400
----------------------------------------------------------------------
.../apache/qpid/jms/JmsAcknowledgeCallback.java | 21 +---
.../org/apache/qpid/jms/JmsMessageConsumer.java | 13 ++-
.../qpid/jms/message/JmsMessageSupport.java | 20 ++++
.../jms/policy/JmsDefaultRedeliveryPolicy.java | 40 +++++--
.../qpid/jms/policy/JmsRedeliveryPolicy.java | 26 ++++-
.../qpid/jms/provider/ProviderConstants.java | 3 +-
.../qpid/jms/provider/amqp/AmqpConsumer.java | 15 +--
.../ConnectionFactoryIntegrationTest.java | 5 +
.../integration/ConsumerIntegrationTest.java | 105 +++++++++++++++++++
9 files changed, 207 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f096ac07/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsAcknowledgeCallback.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsAcknowledgeCallback.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsAcknowledgeCallback.java
index cc60b56..7af2ca7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsAcknowledgeCallback.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsAcknowledgeCallback.java
@@ -17,12 +17,10 @@
package org.apache.qpid.jms;
import static org.apache.qpid.jms.message.JmsMessageSupport.ACCEPTED;
+import static org.apache.qpid.jms.message.JmsMessageSupport.lookupAckTypeForDisposition;
import javax.jms.JMSException;
-import org.apache.qpid.jms.message.JmsMessageSupport;
-import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
-
public final class JmsAcknowledgeCallback {
private final JmsSession session;
@@ -40,23 +38,6 @@ public final class JmsAcknowledgeCallback {
session.acknowledge(lookupAckTypeForDisposition(getAckType()));
}
- private ACK_TYPE lookupAckTypeForDisposition(int dispositionType) throws JMSException {
- switch (dispositionType) {
- case JmsMessageSupport.ACCEPTED:
- return ACK_TYPE.ACCEPTED;
- case JmsMessageSupport.REJECTED:
- return ACK_TYPE.REJECTED;
- case JmsMessageSupport.RELEASED:
- return ACK_TYPE.RELEASED;
- case JmsMessageSupport.MODIFIED_FAILED:
- return ACK_TYPE.MODIFIED_FAILED;
- case JmsMessageSupport.MODIFIED_FAILED_UNDELIVERABLE:
- return ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE;
- default:
- throw new JMSException("Unable to determine ack type for disposition: " + dispositionType);
- }
- }
-
/**
* @return true if the acknowledgement type was updated.
*/
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f096ac07/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index 29446af..b9170e0 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -16,6 +16,8 @@
*/
package org.apache.qpid.jms;
+import static org.apache.qpid.jms.message.JmsMessageSupport.lookupAckTypeForDisposition;
+
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
@@ -323,7 +325,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
performPullIfRequired(timeout, false);
} else if (redeliveryExceeded(envelope)) {
LOG.debug("{} filtered message with excessive redelivery count: {}", getConsumerId(), envelope);
- doAckUndeliverable(envelope);
+ applyRedeliveryPolicyOutcome(envelope);
if (timeout > 0) {
timeout = Math.max(deadline - System.currentTimeMillis(), 0);
}
@@ -428,16 +430,17 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
private void doAckExpired(final JmsInboundMessageDispatch envelope) throws JMSException {
try {
- session.acknowledge(envelope, ACK_TYPE.EXPIRED);
+ session.acknowledge(envelope, ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE);
} catch (JMSException ex) {
session.onException(ex);
throw ex;
}
}
- private void doAckUndeliverable(final JmsInboundMessageDispatch envelope) throws JMSException {
+ private void applyRedeliveryPolicyOutcome(final JmsInboundMessageDispatch envelope) throws JMSException {
try {
- session.acknowledge(envelope, ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE);
+ JmsRedeliveryPolicy redeliveryPolicy = consumerInfo.getRedeliveryPolicy();
+ session.acknowledge(envelope, lookupAckTypeForDisposition(redeliveryPolicy.getOutcome(getDestination())));
} catch (JMSException ex) {
session.onException(ex);
throw ex;
@@ -712,7 +715,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
doAckExpired(envelope);
} else if (redeliveryExceeded(envelope)) {
LOG.trace("{} filtered message with excessive redelivery count: {}", getConsumerId(), envelope);
- doAckUndeliverable(envelope);
+ applyRedeliveryPolicyOutcome(envelope);
} else {
boolean deliveryFailed = false;
boolean autoAckOrDupsOk = acknowledgementMode == Session.AUTO_ACKNOWLEDGE ||
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f096ac07/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageSupport.java
index 657542c..69924c3 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageSupport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageSupport.java
@@ -16,6 +16,10 @@
*/
package org.apache.qpid.jms.message;
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
+
/**
* Set of common utilities and definitions useful for JMS Message handling.
*/
@@ -48,4 +52,20 @@ public class JmsMessageSupport {
public static final int MODIFIED_FAILED = 4;
public static final int MODIFIED_FAILED_UNDELIVERABLE = 5;
+ public static ACK_TYPE lookupAckTypeForDisposition(int dispositionType) throws JMSException {
+ switch (dispositionType) {
+ case JmsMessageSupport.ACCEPTED:
+ return ACK_TYPE.ACCEPTED;
+ case JmsMessageSupport.REJECTED:
+ return ACK_TYPE.REJECTED;
+ case JmsMessageSupport.RELEASED:
+ return ACK_TYPE.RELEASED;
+ case JmsMessageSupport.MODIFIED_FAILED:
+ return ACK_TYPE.MODIFIED_FAILED;
+ case JmsMessageSupport.MODIFIED_FAILED_UNDELIVERABLE:
+ return ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE;
+ default:
+ throw new JMSException("Unable to determine ack type for disposition: " + dispositionType);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f096ac07/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultRedeliveryPolicy.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultRedeliveryPolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultRedeliveryPolicy.java
index e9869b3..9216fd8 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultRedeliveryPolicy.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultRedeliveryPolicy.java
@@ -17,6 +17,7 @@
package org.apache.qpid.jms.policy;
import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.message.JmsMessageSupport;
/**
* Defines the policy used to manage redelivered and recovered Messages.
@@ -24,15 +25,19 @@ import org.apache.qpid.jms.JmsDestination;
public class JmsDefaultRedeliveryPolicy implements JmsRedeliveryPolicy {
public static final int DEFAULT_MAX_REDELIVERIES = -1;
+ public static final int DEFAULT_OUTCOME = JmsMessageSupport.MODIFIED_FAILED_UNDELIVERABLE;
private int maxRedeliveries;
+ private int outcome;
public JmsDefaultRedeliveryPolicy() {
maxRedeliveries = DEFAULT_MAX_REDELIVERIES;
+ outcome = DEFAULT_OUTCOME;
}
public JmsDefaultRedeliveryPolicy(JmsDefaultRedeliveryPolicy source) {
maxRedeliveries = source.maxRedeliveries;
+ outcome = source.outcome;
}
@Override
@@ -45,12 +50,37 @@ public class JmsDefaultRedeliveryPolicy implements JmsRedeliveryPolicy {
return maxRedeliveries;
}
+ @Override
+ public int getOutcome(JmsDestination destination) {
+ return outcome;
+ }
+
+ /**
+ * Returns the configured default outcome that will be used when rejecting messages.
+ * <p>
+ * Default acknowledgement type is Modified with Undeliverable here set to true.
+ *
+ * @return the default outcome used when rejecting messages.
+ */
+ public int getOutcome() {
+ return outcome;
+ }
+
+ /**
+ * Set the default outcome to use when rejecting messages.
+ *
+ * @param outcome
+ * the default outcome applied to a rejected delivery.
+ */
+ public void setOutcome(int outcome) {
+ this.outcome = outcome;
+ }
+
/**
* Returns the configured maximum redeliveries that a message will be
* allowed to have before it is rejected by this client.
*
- * @return the maxRedeliveries
- * the maximum number of redeliveries allowed before a message is rejected.
+ * @return the maximum number of redeliveries allowed before a message is rejected.
*/
public int getMaxRedeliveries() {
return maxRedeliveries;
@@ -73,6 +103,7 @@ public class JmsDefaultRedeliveryPolicy implements JmsRedeliveryPolicy {
final int prime = 31;
int result = 1;
result = prime * result + maxRedeliveries;
+ result = prime * result + outcome;
return result;
}
@@ -91,10 +122,7 @@ public class JmsDefaultRedeliveryPolicy implements JmsRedeliveryPolicy {
}
JmsDefaultRedeliveryPolicy other = (JmsDefaultRedeliveryPolicy) obj;
- if (maxRedeliveries != other.maxRedeliveries) {
- return false;
- }
- return true;
+ return maxRedeliveries == other.maxRedeliveries && outcome == other.outcome;
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f096ac07/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsRedeliveryPolicy.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsRedeliveryPolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsRedeliveryPolicy.java
index 24f0eec..55f2097 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsRedeliveryPolicy.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsRedeliveryPolicy.java
@@ -36,9 +36,31 @@ public interface JmsRedeliveryPolicy {
* @param destination
* the destination that the subscription is redelivering from.
*
- * @return the maxRedeliveries
- * the maximum number of redeliveries allowed before a message is rejected.
+ * @return the maximum number of redeliveries allowed before a message is rejected.
*/
int getMaxRedeliveries(JmsDestination destination);
+ /**
+ * Returns the configured outcome that will be used when rejecting the
+ * message by this client for the given destination when the message has
+ * reached the maximum redelivery threshold.
+ *
+ * The outcome returned here maps to AMQP outcomes using the following
+ * integer values:
+ *
+ * <p><ul>
+ * <li>ACCEPTED = 1
+ * <li>REJECTED = 2
+ * <li>RELEASED = 3
+ * <li>MODIFIED_FAILED = 4
+ * <li>MODIFIED_FAILED_UNDELIVERABLE = 5
+ * </ul><p>
+ *
+ * @param destination
+ * the destination that the subscription is redelivering from.
+ *
+ * @return the outcome to use when rejecting messages.
+ */
+ int getOutcome(JmsDestination destination);
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f096ac07/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java
index ddd157c..d00f35f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java
@@ -31,7 +31,6 @@ public final class ProviderConstants {
MODIFIED_FAILED,
MODIFIED_FAILED_UNDELIVERABLE,
// Conceptual
- DELIVERED,
- EXPIRED;
+ DELIVERED
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f096ac07/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 99e9057..515defa 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -43,6 +43,7 @@ import org.apache.qpid.jms.util.IOExceptionSupport;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Released;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.slf4j.Logger;
@@ -304,10 +305,12 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
} else {
delivery.settle();
}
+ } else if (ackType.equals(ACK_TYPE.MODIFIED_FAILED)) {
+ settleDelivery(delivery, MODIFIED_FAILED);
} else if (ackType.equals(ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE)) {
- deliveryFailedUndeliverable(delivery);
- } else if (ackType.equals(ACK_TYPE.EXPIRED)) {
- deliveryFailedUndeliverable(delivery);
+ settleDelivery(delivery, MODIFIED_FAILED_UNDELIVERABLE);
+ } else if (ackType.equals(ACK_TYPE.REJECTED)) {
+ settleDelivery(delivery, REJECTED);
} else if (ackType.equals(ACK_TYPE.RELEASED)) {
delivery.disposition(Released.getInstance());
delivery.settle();
@@ -489,7 +492,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
// In the future once the JMS mapping is complete we should be
// able to convert everything to some message even if its just
// a bytes messages as a fall back.
- deliveryFailedUndeliverable(incoming);
+ settleDelivery(incoming, MODIFIED_FAILED_UNDELIVERABLE);
return false;
}
@@ -559,8 +562,8 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
return "AmqpConsumer { " + getResourceInfo().getId() + " }";
}
- protected void deliveryFailedUndeliverable(Delivery incoming) {
- incoming.disposition(MODIFIED_FAILED_UNDELIVERABLE);
+ protected void settleDelivery(Delivery incoming, DeliveryState state) {
+ incoming.disposition(state);
incoming.settle();
// TODO: this flows credit, which we might not want, e.g if
// a drain was issued to stop the link.
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f096ac07/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionFactoryIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionFactoryIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionFactoryIntegrationTest.java
index 3897186..d3331d2 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionFactoryIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionFactoryIntegrationTest.java
@@ -548,5 +548,10 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
public int getMaxRedeliveries(JmsDestination destination) {
return JmsDefaultRedeliveryPolicy.DEFAULT_MAX_REDELIVERIES;
}
+
+ @Override
+ public int getOutcome(JmsDestination destination) {
+ return JmsDefaultRedeliveryPolicy.DEFAULT_OUTCOME;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f096ac07/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index a748e1a..5275487 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -58,7 +58,10 @@ import org.apache.qpid.jms.test.testpeer.AmqpPeerRunnable;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.HeaderDescribedType;
import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.RejectedMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.ReleasedMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
@@ -67,6 +70,7 @@ import org.apache.qpid.jms.util.QpidJMSTestRunner;
import org.apache.qpid.jms.util.Repeat;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -1628,4 +1632,105 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
testPeer.waitForAllHandlersToComplete(3000);
}
}
+
+ @Test(timeout=20000)
+ public void testRedeliveryPolicyOutcomeAppliedAccepted() throws Exception {
+ doTestRedeliveryPolicyOutcomeApplied(1);
+ }
+
+ @Test(timeout=20000)
+ public void testRedeliveryPolicyOutcomeAppliedRejected() throws Exception {
+ doTestRedeliveryPolicyOutcomeApplied(2);
+ }
+
+ @Test(timeout=20000)
+ public void testRedeliveryPolicyOutcomeAppliedReleased() throws Exception {
+ doTestRedeliveryPolicyOutcomeApplied(3);
+ }
+
+ @Test(timeout=20000)
+ public void testRedeliveryPolicyOutcomeAppliedModifiedFailed() throws Exception {
+ doTestRedeliveryPolicyOutcomeApplied(4);
+ }
+
+ @Test(timeout=20000)
+ public void testRedeliveryPolicyOutcomeAppliedModifiedFailedUndeliverable() throws Exception {
+ doTestRedeliveryPolicyOutcomeApplied(5);
+ }
+
+ private void doTestRedeliveryPolicyOutcomeApplied(int outcome) throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+ Connection connection = testFixture.establishConnecton(testPeer,
+ "?jms.redeliveryPolicy.maxRedeliveries=1&jms.redeliveryPolicy.outcome=" + outcome);
+ connection.start();
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ HeaderDescribedType header = new HeaderDescribedType();
+ header.setDeliveryCount(new UnsignedInteger(2));
+
+ testPeer.expectReceiverAttach();
+ // Send some messages that have exceeded the specified re-delivery count
+ testPeer.expectLinkFlowRespondWithTransfer(header, null, null, null, new AmqpValueDescribedType("redelivered-content"), 1);
+ // Send a message that has not exceeded the delivery count
+ String expectedContent = "not-redelivered";
+ testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, null, null, null, new AmqpValueDescribedType(expectedContent), 2);
+
+ Matcher<?> outcomeMatcher = null;
+
+ // Expect a disposition matching the given outcome index:
+ //
+ // ACCEPTED = 1
+ // REJECTED = 2
+ // RELEASED = 3
+ // MODIFIED_FAILED = 4
+ // MODIFIED_FAILED_UNDELIVERABLE = 5
+ switch (outcome) {
+ case 1:
+ outcomeMatcher = new AcceptedMatcher();
+ break;
+ case 2:
+ outcomeMatcher = new RejectedMatcher();
+ break;
+ case 3:
+ outcomeMatcher = new ReleasedMatcher();
+ break;
+ case 4:
+ ModifiedMatcher failed = new ModifiedMatcher();
+ failed.withDeliveryFailed(equalTo(true));
+ outcomeMatcher = failed;
+ break;
+ case 5:
+ ModifiedMatcher undeliverable = new ModifiedMatcher();
+ undeliverable.withDeliveryFailed(equalTo(true));
+ undeliverable.withUndeliverableHere(equalTo(true));
+ outcomeMatcher = undeliverable;
+ break;
+ default:
+ throw new IllegalArgumentException("Test passed invalid outcome value");
+ }
+
+ // Expect a settled disposition matching the configured redelivery policy outcome
+ testPeer.expectDisposition(true, outcomeMatcher);
+
+ // Then expect an Accepted disposition for the good message
+ testPeer.expectDisposition(true, new AcceptedMatcher());
+
+ final MessageConsumer consumer = session.createConsumer(queue);
+
+ Message m = consumer.receive(6000);
+ assertNotNull("Should have reiceved the final message", m);
+ assertTrue("Should have received the final message", m instanceof TextMessage);
+ assertEquals("Unexpected content", expectedContent, ((TextMessage)m).getText());
+
+ testPeer.expectClose();
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(2000);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org