You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2015/06/10 17:26:35 UTC

qpid-jms git commit: https://issues.apache.org/jira/browse/QPIDJMS-70

Repository: qpid-jms
Updated Branches:
  refs/heads/master 6667bcffc -> e60d3bd40


https://issues.apache.org/jira/browse/QPIDJMS-70

Adds a redelivery policy that contains configuration for max
redeliveries before the cleint reject an incomnig message.  

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/e60d3bd4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/e60d3bd4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/e60d3bd4

Branch: refs/heads/master
Commit: e60d3bd40796d04c2d280a651c505b1171a6a1d8
Parents: 6667bcf
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Jun 10 11:26:11 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Jun 10 11:26:11 2015 -0400

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsConnection.java |  11 +-
 .../apache/qpid/jms/JmsConnectionFactory.java   |  20 ++++
 .../org/apache/qpid/jms/JmsMessageConsumer.java |  24 +++--
 .../org/apache/qpid/jms/JmsPrefetchPolicy.java  |   9 ++
 .../apache/qpid/jms/JmsRedeliveryPolicy.java    |  62 +++++++++++
 .../apache/qpid/jms/meta/JmsConsumerInfo.java   |  11 ++
 .../qpid/jms/provider/amqp/AmqpConsumer.java    |  43 ++++----
 .../qpid/jms/JmsConnectionFactoryTest.java      |  29 ++++++
 .../org/apache/qpid/jms/JmsConnectionTest.java  |   3 +-
 .../jms/integration/SessionIntegrationTest.java |  36 +++++++
 .../JmsTransactionRedeliveryPolicyTest.java     | 103 +++++++++++++++++++
 11 files changed, 319 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e60d3bd4/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index bd8ebbd..441fef5 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -97,6 +97,7 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti
     private URI configuredURI;
     private URI connectedURI;
     private JmsPrefetchPolicy prefetchPolicy = new JmsPrefetchPolicy();
+    private JmsRedeliveryPolicy redeliveryPolicy = new JmsRedeliveryPolicy();
     private boolean localMessagePriority;
     private boolean clientIdSet;
     private boolean sendAcksAsync;
@@ -858,7 +859,15 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti
     }
 
     public void setPrefetchPolicy(JmsPrefetchPolicy prefetchPolicy) {
-        this.prefetchPolicy = prefetchPolicy;
+        this.prefetchPolicy = prefetchPolicy.copy();
+    }
+
+    public JmsRedeliveryPolicy getRedeliveryPolicy() {
+        return redeliveryPolicy;
+    }
+
+    public void setRedeliveryPolicy(JmsRedeliveryPolicy redeliveryPolicy) {
+        this.redeliveryPolicy = redeliveryPolicy.copy();
     }
 
     public boolean isLocalMessagePriority() {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e60d3bd4/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
index dbeec40..a71df04 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
@@ -74,6 +74,7 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact
     private ExceptionListener exceptionListener;
 
     private JmsPrefetchPolicy prefetchPolicy = new JmsPrefetchPolicy();
+    private JmsRedeliveryPolicy redeliveryPolicy = new JmsRedeliveryPolicy();
 
     public JmsConnectionFactory() {
     }
@@ -505,6 +506,25 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact
     }
 
     /**
+     * Returns the JmsRedeliveryPolicy that is applied when a new connection is created.
+     *
+     * @return the redeliveryPolicy that is currently configured for this factory.
+     */
+    public JmsRedeliveryPolicy getRedeliveryPolicy() {
+        return redeliveryPolicy;
+    }
+
+    /**
+     * Sets the JmsRedeliveryPolicy that is applied when a new connection is created.
+     *
+     * @param redeliveryPolicy
+     *        The new redeliveryPolicy to set
+     */
+    public void setRedeliveryPolicy(JmsRedeliveryPolicy redeliveryPolicy) {
+        this.redeliveryPolicy = redeliveryPolicy;
+    }
+
+    /**
      * @return the currently configured client ID prefix for auto-generated client IDs.
      */
     public synchronized String getClientIDPrefix() {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e60d3bd4/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index b261fbd..7cc3ec6 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -102,17 +102,19 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
             this.messageQueue = new FifoMessageQueue();
         }
 
-        JmsPrefetchPolicy policy = this.connection.getPrefetchPolicy();
-
-        this.consumerInfo = new JmsConsumerInfo(consumerId);
-        this.consumerInfo.setClientId(connection.getClientID());
-        this.consumerInfo.setSelector(selector);
-        this.consumerInfo.setSubscriptionName(name);
-        this.consumerInfo.setDestination(destination);
-        this.consumerInfo.setAcknowledgementMode(acknowledgementMode);
-        this.consumerInfo.setNoLocal(noLocal);
-        this.consumerInfo.setBrowser(isBrowser());
-        this.consumerInfo.setPrefetchSize(getConfiguredPrefetch(destination, policy));
+        JmsPrefetchPolicy policy = connection.getPrefetchPolicy();
+        JmsRedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy().copy();
+
+        consumerInfo = new JmsConsumerInfo(consumerId);
+        consumerInfo.setClientId(connection.getClientID());
+        consumerInfo.setSelector(selector);
+        consumerInfo.setSubscriptionName(name);
+        consumerInfo.setDestination(destination);
+        consumerInfo.setAcknowledgementMode(acknowledgementMode);
+        consumerInfo.setNoLocal(noLocal);
+        consumerInfo.setBrowser(isBrowser());
+        consumerInfo.setPrefetchSize(getConfiguredPrefetch(destination, policy));
+        consumerInfo.setRedeliveryPolicy(redeliveryPolicy);
 
         session.getConnection().createResource(consumerInfo);
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e60d3bd4/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPrefetchPolicy.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPrefetchPolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPrefetchPolicy.java
index a8586d2..a8165ac 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPrefetchPolicy.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPrefetchPolicy.java
@@ -63,6 +63,15 @@ public class JmsPrefetchPolicy {
     }
 
     /**
+     * Copy this policy into a newly allocated instance.
+     *
+     * @return a new JmsPrefetchPolicy that is a copy of this one.
+     */
+    public JmsPrefetchPolicy copy() {
+        return new JmsPrefetchPolicy(this);
+    }
+
+    /**
      * @return Returns the durableTopicPrefetch.
      */
     public int getDurableTopicPrefetch() {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e60d3bd4/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsRedeliveryPolicy.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsRedeliveryPolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsRedeliveryPolicy.java
new file mode 100644
index 0000000..5201fc8
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsRedeliveryPolicy.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+/**
+ * Defines the policy used to manage redelivered and recovered Messages.
+ */
+public class JmsRedeliveryPolicy {
+
+    public static final int DEFAULT_MAX_REDELIVERIES = -1;
+
+    private int maxRedeliveries;
+
+    public JmsRedeliveryPolicy() {
+        maxRedeliveries = DEFAULT_MAX_REDELIVERIES;
+    }
+
+    public JmsRedeliveryPolicy(JmsRedeliveryPolicy source) {
+        maxRedeliveries = source.maxRedeliveries;
+    }
+
+    public JmsRedeliveryPolicy copy() {
+        return new JmsRedeliveryPolicy(this);
+    }
+
+    /**
+     * Returns the configured maximum redeliveries that a message will be
+     * allowed to have before it is rejected by this client.
+     *
+     * @return the maxRedeliveries
+     *         the maximum number of redeliveries allowed before a message is rejected.
+     */
+    public int getMaxRedeliveries() {
+        return maxRedeliveries;
+    }
+
+    /**
+     * Configures the maximum number of time a message can be redelivered before it
+     * will be rejected by this client.
+     *
+     * The default value of (-1) disables max redelivery processing.
+     *
+     * @param maxRedeliveries the maxRedeliveries to set
+     */
+    public void setMaxRedeliveries(int maxRedeliveries) {
+        this.maxRedeliveries = maxRedeliveries;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e60d3bd4/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
index 192a6b4..a7e7ad0 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
@@ -17,6 +17,7 @@
 package org.apache.qpid.jms.meta;
 
 import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.JmsRedeliveryPolicy;
 import org.apache.qpid.jms.util.ToStringSupport;
 
 public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsumerInfo> {
@@ -31,6 +32,8 @@ public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsume
     protected boolean noLocal;
     protected int acknowledgementMode;
 
+    protected JmsRedeliveryPolicy redeliveryPolicy;
+
     // Can be used to track the last consumed message.
     private transient long lastDeliveredSequenceId;
 
@@ -150,6 +153,14 @@ public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsume
         this.acknowledgementMode = acknowledgementMode;
     }
 
+    public JmsRedeliveryPolicy getRedeliveryPolicy() {
+        return redeliveryPolicy;
+    }
+
+    public void setRedeliveryPolicy(JmsRedeliveryPolicy redeliveryPolicy) {
+        this.redeliveryPolicy = redeliveryPolicy;
+    }
+
     @Override
     public String toString() {
         return ToStringSupport.toString(this);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e60d3bd4/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index fcdd0f3..46212af 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -81,7 +81,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
 
     private final ByteBuf incomingBuffer = Unpooled.buffer(INITIAL_BUFFER_CAPACITY);
 
-    private final AtomicLong _incomingSequence = new AtomicLong(0);
+    private final AtomicLong incomingSequence = new AtomicLong(0);
 
     private AsyncResult stopRequest;
 
@@ -115,7 +115,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
                 stopRequest = request;
             }
         } else {
-            //TODO: We dont actually want the additional messages that could be sent while
+            // TODO: We dont actually want the additional messages that could be sent while
             // draining. We could explicitly reduce credit first, or possibly use 'echo' instead
             // of drain if it was supported. We would first need to understand what happens
             // if we reduce credit below the number of messages already in-flight before
@@ -321,7 +321,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
                 }
             }
         } else if (ackType.equals(ACK_TYPE.POISONED)) {
-            deliveryFailed(delivery, false);
+            deliveryFailed(delivery);
         } else if (ackType.equals(ACK_TYPE.RELEASED)) {
             delivery.disposition(Released.getInstance());
             delivery.settle();
@@ -387,14 +387,13 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
         do {
             incoming = getEndpoint().current();
             if (incoming != null) {
-                if(incoming.isReadable() && !incoming.isPartial()) {
+                if (incoming.isReadable() && !incoming.isPartial()) {
                     LOG.trace("{} has incoming Message(s).", this);
                     try {
                         processDelivery(incoming);
                     } catch (Exception e) {
                         throw IOExceptionSupport.create(e);
                     }
-                    getEndpoint().advance();
                 } else {
                     LOG.trace("{} has a partial incoming Message(s), deferring.", this);
                     incoming = null;
@@ -402,12 +401,9 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
             } else {
                 // We have exhausted the locally queued messages on this link.
                 // Check if we tried to stop and have now run out of credit.
-                if(stopRequest != null) {
-                    if(getEndpoint().getRemoteCredit() <= 0)
-                    {
-                        stopRequest.onSuccess();
-                        stopRequest = null;
-                    }
+                if (stopRequest != null && getEndpoint().getRemoteCredit() <= 0) {
+                    stopRequest.onSuccess();
+                    stopRequest = null;
                 }
             }
         } while (incoming != null);
@@ -416,9 +412,22 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
     }
 
     private void processDelivery(Delivery incoming) throws Exception {
+
+        Message amqpMessage = decodeIncomingMessage(incoming);
+        long deliveryCount = amqpMessage.getDeliveryCount();
+        int maxRedeliveries = getJmsResource().getRedeliveryPolicy().getMaxRedeliveries();
+
+        if (maxRedeliveries >= 0 && deliveryCount > maxRedeliveries) {
+            LOG.trace("{} rejecting delivery that exceeds max redelivery count. {}", this, amqpMessage.getMessageId());
+            deliveryFailed(incoming);
+            return;
+        } else {
+            getEndpoint().advance();
+        }
+
         JmsMessage message = null;
         try {
-            message = AmqpJmsMessageBuilder.createJmsMessage(this, decodeIncomingMessage(incoming));
+            message = AmqpJmsMessageBuilder.createJmsMessage(this, amqpMessage);
         } catch (Exception e) {
             LOG.warn("Error on transform: {}", e.getMessage());
             // TODO - We could signal provider error but not sure we want to fail
@@ -426,7 +435,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
             //        In the future once the JMS mapping is complete we should be
             //        able to convert everything to some message even if its just
             //        a bytes messages as a fall back.
-            deliveryFailed(incoming, true);
+            deliveryFailed(incoming);
             return;
         }
 
@@ -449,7 +458,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
     }
 
     protected long getNextIncomingSequenceNumber() {
-        return _incomingSequence.incrementAndGet();
+        return incomingSequence.incrementAndGet();
     }
 
     @Override
@@ -498,15 +507,13 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
         return "AmqpConsumer { " + this.resource.getConsumerId() + " }";
     }
 
-    protected void deliveryFailed(Delivery incoming, boolean expandCredit) {
+    protected void deliveryFailed(Delivery incoming) {
         Modified disposition = new Modified();
         disposition.setUndeliverableHere(true);
         disposition.setDeliveryFailed(true);
         incoming.disposition(disposition);
         incoming.settle();
-        if (expandCredit) {
-            getEndpoint().flow(1);
-        }
+        sendFlowIfNeeded();
     }
 
     protected void deliver(JmsInboundMessageDispatch envelope) throws Exception {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e60d3bd4/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionFactoryTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionFactoryTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionFactoryTest.java
index c645920..75317cb 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionFactoryTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionFactoryTest.java
@@ -344,6 +344,35 @@ public class JmsConnectionFactoryTest extends QpidJmsTestCase {
         assertEquals("Properties were not equal", props, props2);
     }
 
+    /**
+     * The redelivery policy is maintained in a child-object, which we extract the properties from
+     * when serializing the factory. Ensure this functions by doing a round trip on a factory
+     * configured with some new redelivery configuration via the URI.
+     */
+    @Test
+    public void testSerializeThenDeserializeMaintainsRedeliveryPolicy() throws Exception {
+        String maxRedeliveryValue = "5";
+        String maxRedeliveryKey = "redeliveryPolicy.maxRedeliveries";
+        String uri = "amqp://localhost:1234?jms." + maxRedeliveryKey + "=" + maxRedeliveryValue;
+
+        JmsConnectionFactory cf = new JmsConnectionFactory(uri);
+        Map<String, String> props = cf.getProperties();
+
+        assertTrue("Props dont contain expected redelivery policy change", props.containsKey(maxRedeliveryKey));
+        assertEquals("Unexpected value", maxRedeliveryValue, props.get(maxRedeliveryKey));
+
+        Object roundTripped = roundTripSerialize(cf);
+
+        assertNotNull("Null object returned", roundTripped);
+        assertEquals("Unexpected type", JmsConnectionFactory.class, roundTripped.getClass());
+
+        Map<String, String> props2 = ((JmsConnectionFactory)roundTripped).getProperties();
+        assertTrue("Props dont contain expected redelivery policy change", props2.containsKey(maxRedeliveryKey));
+        assertEquals("Unexpected value", maxRedeliveryValue, props2.get(maxRedeliveryKey));
+
+        assertEquals("Properties were not equal", props, props2);
+    }
+
     @Test
     public void testSerializeTwoConnectionFactories() throws Exception {
         String uri = "amqp://localhost:1234";

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e60d3bd4/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
index 231a8de..fe5480d 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -105,7 +104,7 @@ public class JmsConnectionTest {
 
         assertNotSame(newPolicy, connection.getPrefetchPolicy());
         connection.setPrefetchPolicy(newPolicy);
-        assertSame(newPolicy, connection.getPrefetchPolicy());
+        assertEquals(newPolicy, connection.getPrefetchPolicy());
     }
 
     @Test(timeout=30000)

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e60d3bd4/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index f6bd0d8..e87c34d 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -66,6 +66,7 @@ import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Released;
 import org.apache.qpid.jms.test.testpeer.describedtypes.TransactionalState;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.HeaderDescribedType;
 import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher;
@@ -898,6 +899,41 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
     }
 
     @Test(timeout=5000)
+    public void testIncomingMessageExceedsMaxRedeliveries() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            final int COUNT = 5;
+
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            ((JmsConnection) connection).getRedeliveryPolicy().setMaxRedeliveries(1);
+
+            testPeer.expectBegin(true);
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            HeaderDescribedType header = new HeaderDescribedType();
+            header.setDeliveryCount(new UnsignedInteger(2));
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(header, null, null, null, new AmqpValueDescribedType("content"), COUNT);
+
+            for (int i = 0; i < COUNT; i++) {
+                // Then expect an *settled* Modified disposition that rejects each message once
+                ModifiedMatcher modified = new ModifiedMatcher();
+                modified.withDeliveryFailed(equalTo(true));
+                modified.withUndeliverableHere(equalTo(true));
+                testPeer.expectDisposition(true, modified);
+            }
+
+            session.createConsumer(queue);
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout=5000)
     public void testProducedMessagesOnTransactedSessionCarryTxnId() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
             Connection connection = testFixture.establishConnecton(testPeer);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e60d3bd4/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactionRedeliveryPolicyTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactionRedeliveryPolicyTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactionRedeliveryPolicyTest.java
new file mode 100644
index 0000000..5bd4d1e
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactionRedeliveryPolicyTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transactions;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.apache.qpid.jms.support.Wait;
+import org.junit.Test;
+
+/**
+ * test redelivery policy application in a TX session.
+ */
+public class JmsTransactionRedeliveryPolicyTest extends AmqpTestSupport {
+
+    @Override
+    public String getAmqpConnectionURIOptions() {
+        return "jms.redeliveryPolicy.maxRedeliveries=5";
+    }
+
+    @Test(timeout = 30000)
+    public void testConsumeAndRollbackWithMaxRedeliveries() throws Exception {
+        final int MAX_REDELIVERIES = 5;
+        final int MSG_COUNT = 5;
+
+        connection = createAmqpConnection();
+        connection.start();
+
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        Queue queue = session.createQueue(getDestinationName());
+        MessageConsumer consumer = session.createConsumer(queue);
+        sendMessages(connection, queue, MSG_COUNT);
+
+        final QueueViewMBean queueView = getProxyToQueue(getDestinationName());
+
+        // Consume the message for the first time.
+        Message incoming = null;
+        for (int i = 0; i < MSG_COUNT; ++i) {
+            incoming = consumer.receive(2000);
+            assertNotNull(incoming);
+            assertFalse(incoming.getJMSRedelivered());
+            assertTrue(incoming instanceof TextMessage);
+        }
+        session.rollback();
+
+        for (int i = 0; i < MAX_REDELIVERIES; ++i) {
+            LOG.info("Queue size before consume is: {}", queueView.getQueueSize());
+            assertEquals(MSG_COUNT, queueView.getQueueSize());
+
+            for (int j = 0; j < MSG_COUNT; ++j) {
+                incoming = consumer.receive(2000);
+                assertNotNull(incoming);
+                assertTrue(incoming.getJMSRedelivered());
+                assertTrue(incoming instanceof TextMessage);
+            }
+
+            assertEquals(MSG_COUNT, queueView.getQueueSize());
+
+            session.rollback();
+            LOG.info("Queue size after session rollback is: {}", queueView.getQueueSize());
+        }
+
+        assertTrue("Message should get DLQ'd", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return queueView.getQueueSize() == 0;
+            }
+        }));
+
+        QueueViewMBean dlq = getProxyToQueue("ActiveMQ.DLQ");
+        assertEquals(MSG_COUNT, dlq.getQueueSize());
+
+        assertNull(consumer.receive(50));
+
+        session.commit();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org