You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2015/06/10 17:26:35 UTC
qpid-jms git commit: https://issues.apache.org/jira/browse/QPIDJMS-70
Repository: qpid-jms
Updated Branches:
refs/heads/master 6667bcffc -> e60d3bd40
https://issues.apache.org/jira/browse/QPIDJMS-70
Adds a redelivery policy that contains configuration for max
redeliveries before the cleint reject an incomnig message.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/e60d3bd4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/e60d3bd4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/e60d3bd4
Branch: refs/heads/master
Commit: e60d3bd40796d04c2d280a651c505b1171a6a1d8
Parents: 6667bcf
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Jun 10 11:26:11 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Jun 10 11:26:11 2015 -0400
----------------------------------------------------------------------
.../java/org/apache/qpid/jms/JmsConnection.java | 11 +-
.../apache/qpid/jms/JmsConnectionFactory.java | 20 ++++
.../org/apache/qpid/jms/JmsMessageConsumer.java | 24 +++--
.../org/apache/qpid/jms/JmsPrefetchPolicy.java | 9 ++
.../apache/qpid/jms/JmsRedeliveryPolicy.java | 62 +++++++++++
.../apache/qpid/jms/meta/JmsConsumerInfo.java | 11 ++
.../qpid/jms/provider/amqp/AmqpConsumer.java | 43 ++++----
.../qpid/jms/JmsConnectionFactoryTest.java | 29 ++++++
.../org/apache/qpid/jms/JmsConnectionTest.java | 3 +-
.../jms/integration/SessionIntegrationTest.java | 36 +++++++
.../JmsTransactionRedeliveryPolicyTest.java | 103 +++++++++++++++++++
11 files changed, 319 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e60d3bd4/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index bd8ebbd..441fef5 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -97,6 +97,7 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti
private URI configuredURI;
private URI connectedURI;
private JmsPrefetchPolicy prefetchPolicy = new JmsPrefetchPolicy();
+ private JmsRedeliveryPolicy redeliveryPolicy = new JmsRedeliveryPolicy();
private boolean localMessagePriority;
private boolean clientIdSet;
private boolean sendAcksAsync;
@@ -858,7 +859,15 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti
}
public void setPrefetchPolicy(JmsPrefetchPolicy prefetchPolicy) {
- this.prefetchPolicy = prefetchPolicy;
+ this.prefetchPolicy = prefetchPolicy.copy();
+ }
+
+ public JmsRedeliveryPolicy getRedeliveryPolicy() {
+ return redeliveryPolicy;
+ }
+
+ public void setRedeliveryPolicy(JmsRedeliveryPolicy redeliveryPolicy) {
+ this.redeliveryPolicy = redeliveryPolicy.copy();
}
public boolean isLocalMessagePriority() {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e60d3bd4/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
index dbeec40..a71df04 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
@@ -74,6 +74,7 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact
private ExceptionListener exceptionListener;
private JmsPrefetchPolicy prefetchPolicy = new JmsPrefetchPolicy();
+ private JmsRedeliveryPolicy redeliveryPolicy = new JmsRedeliveryPolicy();
public JmsConnectionFactory() {
}
@@ -505,6 +506,25 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact
}
/**
+ * Returns the JmsRedeliveryPolicy that is applied when a new connection is created.
+ *
+ * @return the redeliveryPolicy that is currently configured for this factory.
+ */
+ public JmsRedeliveryPolicy getRedeliveryPolicy() {
+ return redeliveryPolicy;
+ }
+
+ /**
+ * Sets the JmsRedeliveryPolicy that is applied when a new connection is created.
+ *
+ * @param redeliveryPolicy
+ * The new redeliveryPolicy to set
+ */
+ public void setRedeliveryPolicy(JmsRedeliveryPolicy redeliveryPolicy) {
+ this.redeliveryPolicy = redeliveryPolicy;
+ }
+
+ /**
* @return the currently configured client ID prefix for auto-generated client IDs.
*/
public synchronized String getClientIDPrefix() {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e60d3bd4/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index b261fbd..7cc3ec6 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -102,17 +102,19 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
this.messageQueue = new FifoMessageQueue();
}
- JmsPrefetchPolicy policy = this.connection.getPrefetchPolicy();
-
- this.consumerInfo = new JmsConsumerInfo(consumerId);
- this.consumerInfo.setClientId(connection.getClientID());
- this.consumerInfo.setSelector(selector);
- this.consumerInfo.setSubscriptionName(name);
- this.consumerInfo.setDestination(destination);
- this.consumerInfo.setAcknowledgementMode(acknowledgementMode);
- this.consumerInfo.setNoLocal(noLocal);
- this.consumerInfo.setBrowser(isBrowser());
- this.consumerInfo.setPrefetchSize(getConfiguredPrefetch(destination, policy));
+ JmsPrefetchPolicy policy = connection.getPrefetchPolicy();
+ JmsRedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy().copy();
+
+ consumerInfo = new JmsConsumerInfo(consumerId);
+ consumerInfo.setClientId(connection.getClientID());
+ consumerInfo.setSelector(selector);
+ consumerInfo.setSubscriptionName(name);
+ consumerInfo.setDestination(destination);
+ consumerInfo.setAcknowledgementMode(acknowledgementMode);
+ consumerInfo.setNoLocal(noLocal);
+ consumerInfo.setBrowser(isBrowser());
+ consumerInfo.setPrefetchSize(getConfiguredPrefetch(destination, policy));
+ consumerInfo.setRedeliveryPolicy(redeliveryPolicy);
session.getConnection().createResource(consumerInfo);
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e60d3bd4/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPrefetchPolicy.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPrefetchPolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPrefetchPolicy.java
index a8586d2..a8165ac 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPrefetchPolicy.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPrefetchPolicy.java
@@ -63,6 +63,15 @@ public class JmsPrefetchPolicy {
}
/**
+ * Copy this policy into a newly allocated instance.
+ *
+ * @return a new JmsPrefetchPolicy that is a copy of this one.
+ */
+ public JmsPrefetchPolicy copy() {
+ return new JmsPrefetchPolicy(this);
+ }
+
+ /**
* @return Returns the durableTopicPrefetch.
*/
public int getDurableTopicPrefetch() {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e60d3bd4/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsRedeliveryPolicy.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsRedeliveryPolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsRedeliveryPolicy.java
new file mode 100644
index 0000000..5201fc8
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsRedeliveryPolicy.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+/**
+ * Defines the policy used to manage redelivered and recovered Messages.
+ */
+public class JmsRedeliveryPolicy {
+
+ public static final int DEFAULT_MAX_REDELIVERIES = -1;
+
+ private int maxRedeliveries;
+
+ public JmsRedeliveryPolicy() {
+ maxRedeliveries = DEFAULT_MAX_REDELIVERIES;
+ }
+
+ public JmsRedeliveryPolicy(JmsRedeliveryPolicy source) {
+ maxRedeliveries = source.maxRedeliveries;
+ }
+
+ public JmsRedeliveryPolicy copy() {
+ return new JmsRedeliveryPolicy(this);
+ }
+
+ /**
+ * Returns the configured maximum redeliveries that a message will be
+ * allowed to have before it is rejected by this client.
+ *
+ * @return the maxRedeliveries
+ * the maximum number of redeliveries allowed before a message is rejected.
+ */
+ public int getMaxRedeliveries() {
+ return maxRedeliveries;
+ }
+
+ /**
+ * Configures the maximum number of time a message can be redelivered before it
+ * will be rejected by this client.
+ *
+ * The default value of (-1) disables max redelivery processing.
+ *
+ * @param maxRedeliveries the maxRedeliveries to set
+ */
+ public void setMaxRedeliveries(int maxRedeliveries) {
+ this.maxRedeliveries = maxRedeliveries;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e60d3bd4/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
index 192a6b4..a7e7ad0 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
@@ -17,6 +17,7 @@
package org.apache.qpid.jms.meta;
import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.JmsRedeliveryPolicy;
import org.apache.qpid.jms.util.ToStringSupport;
public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsumerInfo> {
@@ -31,6 +32,8 @@ public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsume
protected boolean noLocal;
protected int acknowledgementMode;
+ protected JmsRedeliveryPolicy redeliveryPolicy;
+
// Can be used to track the last consumed message.
private transient long lastDeliveredSequenceId;
@@ -150,6 +153,14 @@ public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsume
this.acknowledgementMode = acknowledgementMode;
}
+ public JmsRedeliveryPolicy getRedeliveryPolicy() {
+ return redeliveryPolicy;
+ }
+
+ public void setRedeliveryPolicy(JmsRedeliveryPolicy redeliveryPolicy) {
+ this.redeliveryPolicy = redeliveryPolicy;
+ }
+
@Override
public String toString() {
return ToStringSupport.toString(this);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e60d3bd4/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index fcdd0f3..46212af 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -81,7 +81,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
private final ByteBuf incomingBuffer = Unpooled.buffer(INITIAL_BUFFER_CAPACITY);
- private final AtomicLong _incomingSequence = new AtomicLong(0);
+ private final AtomicLong incomingSequence = new AtomicLong(0);
private AsyncResult stopRequest;
@@ -115,7 +115,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
stopRequest = request;
}
} else {
- //TODO: We dont actually want the additional messages that could be sent while
+ // TODO: We dont actually want the additional messages that could be sent while
// draining. We could explicitly reduce credit first, or possibly use 'echo' instead
// of drain if it was supported. We would first need to understand what happens
// if we reduce credit below the number of messages already in-flight before
@@ -321,7 +321,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
}
}
} else if (ackType.equals(ACK_TYPE.POISONED)) {
- deliveryFailed(delivery, false);
+ deliveryFailed(delivery);
} else if (ackType.equals(ACK_TYPE.RELEASED)) {
delivery.disposition(Released.getInstance());
delivery.settle();
@@ -387,14 +387,13 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
do {
incoming = getEndpoint().current();
if (incoming != null) {
- if(incoming.isReadable() && !incoming.isPartial()) {
+ if (incoming.isReadable() && !incoming.isPartial()) {
LOG.trace("{} has incoming Message(s).", this);
try {
processDelivery(incoming);
} catch (Exception e) {
throw IOExceptionSupport.create(e);
}
- getEndpoint().advance();
} else {
LOG.trace("{} has a partial incoming Message(s), deferring.", this);
incoming = null;
@@ -402,12 +401,9 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
} else {
// We have exhausted the locally queued messages on this link.
// Check if we tried to stop and have now run out of credit.
- if(stopRequest != null) {
- if(getEndpoint().getRemoteCredit() <= 0)
- {
- stopRequest.onSuccess();
- stopRequest = null;
- }
+ if (stopRequest != null && getEndpoint().getRemoteCredit() <= 0) {
+ stopRequest.onSuccess();
+ stopRequest = null;
}
}
} while (incoming != null);
@@ -416,9 +412,22 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
}
private void processDelivery(Delivery incoming) throws Exception {
+
+ Message amqpMessage = decodeIncomingMessage(incoming);
+ long deliveryCount = amqpMessage.getDeliveryCount();
+ int maxRedeliveries = getJmsResource().getRedeliveryPolicy().getMaxRedeliveries();
+
+ if (maxRedeliveries >= 0 && deliveryCount > maxRedeliveries) {
+ LOG.trace("{} rejecting delivery that exceeds max redelivery count. {}", this, amqpMessage.getMessageId());
+ deliveryFailed(incoming);
+ return;
+ } else {
+ getEndpoint().advance();
+ }
+
JmsMessage message = null;
try {
- message = AmqpJmsMessageBuilder.createJmsMessage(this, decodeIncomingMessage(incoming));
+ message = AmqpJmsMessageBuilder.createJmsMessage(this, amqpMessage);
} catch (Exception e) {
LOG.warn("Error on transform: {}", e.getMessage());
// TODO - We could signal provider error but not sure we want to fail
@@ -426,7 +435,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
// In the future once the JMS mapping is complete we should be
// able to convert everything to some message even if its just
// a bytes messages as a fall back.
- deliveryFailed(incoming, true);
+ deliveryFailed(incoming);
return;
}
@@ -449,7 +458,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
}
protected long getNextIncomingSequenceNumber() {
- return _incomingSequence.incrementAndGet();
+ return incomingSequence.incrementAndGet();
}
@Override
@@ -498,15 +507,13 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
return "AmqpConsumer { " + this.resource.getConsumerId() + " }";
}
- protected void deliveryFailed(Delivery incoming, boolean expandCredit) {
+ protected void deliveryFailed(Delivery incoming) {
Modified disposition = new Modified();
disposition.setUndeliverableHere(true);
disposition.setDeliveryFailed(true);
incoming.disposition(disposition);
incoming.settle();
- if (expandCredit) {
- getEndpoint().flow(1);
- }
+ sendFlowIfNeeded();
}
protected void deliver(JmsInboundMessageDispatch envelope) throws Exception {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e60d3bd4/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionFactoryTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionFactoryTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionFactoryTest.java
index c645920..75317cb 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionFactoryTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionFactoryTest.java
@@ -344,6 +344,35 @@ public class JmsConnectionFactoryTest extends QpidJmsTestCase {
assertEquals("Properties were not equal", props, props2);
}
+ /**
+ * The redelivery policy is maintained in a child-object, which we extract the properties from
+ * when serializing the factory. Ensure this functions by doing a round trip on a factory
+ * configured with some new redelivery configuration via the URI.
+ */
+ @Test
+ public void testSerializeThenDeserializeMaintainsRedeliveryPolicy() throws Exception {
+ String maxRedeliveryValue = "5";
+ String maxRedeliveryKey = "redeliveryPolicy.maxRedeliveries";
+ String uri = "amqp://localhost:1234?jms." + maxRedeliveryKey + "=" + maxRedeliveryValue;
+
+ JmsConnectionFactory cf = new JmsConnectionFactory(uri);
+ Map<String, String> props = cf.getProperties();
+
+ assertTrue("Props dont contain expected redelivery policy change", props.containsKey(maxRedeliveryKey));
+ assertEquals("Unexpected value", maxRedeliveryValue, props.get(maxRedeliveryKey));
+
+ Object roundTripped = roundTripSerialize(cf);
+
+ assertNotNull("Null object returned", roundTripped);
+ assertEquals("Unexpected type", JmsConnectionFactory.class, roundTripped.getClass());
+
+ Map<String, String> props2 = ((JmsConnectionFactory)roundTripped).getProperties();
+ assertTrue("Props dont contain expected redelivery policy change", props2.containsKey(maxRedeliveryKey));
+ assertEquals("Unexpected value", maxRedeliveryValue, props2.get(maxRedeliveryKey));
+
+ assertEquals("Properties were not equal", props, props2);
+ }
+
@Test
public void testSerializeTwoConnectionFactories() throws Exception {
String uri = "amqp://localhost:1234";
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e60d3bd4/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
index 231a8de..fe5480d 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -105,7 +104,7 @@ public class JmsConnectionTest {
assertNotSame(newPolicy, connection.getPrefetchPolicy());
connection.setPrefetchPolicy(newPolicy);
- assertSame(newPolicy, connection.getPrefetchPolicy());
+ assertEquals(newPolicy, connection.getPrefetchPolicy());
}
@Test(timeout=30000)
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e60d3bd4/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index f6bd0d8..e87c34d 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -66,6 +66,7 @@ import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected;
import org.apache.qpid.jms.test.testpeer.describedtypes.Released;
import org.apache.qpid.jms.test.testpeer.describedtypes.TransactionalState;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.HeaderDescribedType;
import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher;
@@ -898,6 +899,41 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
}
@Test(timeout=5000)
+ public void testIncomingMessageExceedsMaxRedeliveries() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ final int COUNT = 5;
+
+ Connection connection = testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ ((JmsConnection) connection).getRedeliveryPolicy().setMaxRedeliveries(1);
+
+ testPeer.expectBegin(true);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ HeaderDescribedType header = new HeaderDescribedType();
+ header.setDeliveryCount(new UnsignedInteger(2));
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(header, null, null, null, new AmqpValueDescribedType("content"), COUNT);
+
+ for (int i = 0; i < COUNT; i++) {
+ // Then expect an *settled* Modified disposition that rejects each message once
+ ModifiedMatcher modified = new ModifiedMatcher();
+ modified.withDeliveryFailed(equalTo(true));
+ modified.withUndeliverableHere(equalTo(true));
+ testPeer.expectDisposition(true, modified);
+ }
+
+ session.createConsumer(queue);
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout=5000)
public void testProducedMessagesOnTransactedSessionCarryTxnId() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e60d3bd4/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactionRedeliveryPolicyTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactionRedeliveryPolicyTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactionRedeliveryPolicyTest.java
new file mode 100644
index 0000000..5bd4d1e
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactionRedeliveryPolicyTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transactions;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.apache.qpid.jms.support.Wait;
+import org.junit.Test;
+
+/**
+ * test redelivery policy application in a TX session.
+ */
+public class JmsTransactionRedeliveryPolicyTest extends AmqpTestSupport {
+
+ @Override
+ public String getAmqpConnectionURIOptions() {
+ return "jms.redeliveryPolicy.maxRedeliveries=5";
+ }
+
+ @Test(timeout = 30000)
+ public void testConsumeAndRollbackWithMaxRedeliveries() throws Exception {
+ final int MAX_REDELIVERIES = 5;
+ final int MSG_COUNT = 5;
+
+ connection = createAmqpConnection();
+ connection.start();
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(getDestinationName());
+ MessageConsumer consumer = session.createConsumer(queue);
+ sendMessages(connection, queue, MSG_COUNT);
+
+ final QueueViewMBean queueView = getProxyToQueue(getDestinationName());
+
+ // Consume the message for the first time.
+ Message incoming = null;
+ for (int i = 0; i < MSG_COUNT; ++i) {
+ incoming = consumer.receive(2000);
+ assertNotNull(incoming);
+ assertFalse(incoming.getJMSRedelivered());
+ assertTrue(incoming instanceof TextMessage);
+ }
+ session.rollback();
+
+ for (int i = 0; i < MAX_REDELIVERIES; ++i) {
+ LOG.info("Queue size before consume is: {}", queueView.getQueueSize());
+ assertEquals(MSG_COUNT, queueView.getQueueSize());
+
+ for (int j = 0; j < MSG_COUNT; ++j) {
+ incoming = consumer.receive(2000);
+ assertNotNull(incoming);
+ assertTrue(incoming.getJMSRedelivered());
+ assertTrue(incoming instanceof TextMessage);
+ }
+
+ assertEquals(MSG_COUNT, queueView.getQueueSize());
+
+ session.rollback();
+ LOG.info("Queue size after session rollback is: {}", queueView.getQueueSize());
+ }
+
+ assertTrue("Message should get DLQ'd", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return queueView.getQueueSize() == 0;
+ }
+ }));
+
+ QueueViewMBean dlq = getProxyToQueue("ActiveMQ.DLQ");
+ assertEquals(MSG_COUNT, dlq.getQueueSize());
+
+ assertNull(consumer.receive(50));
+
+ session.commit();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org