You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/05/20 19:00:24 UTC

[2/2] qpid-jms git commit: QPIDJMS-177 Refine the policy object used by the message consumer and producer to all for finer grained control with the ability for the user to implement and configure thier own policy instances.

QPIDJMS-177 Refine the policy object used by the message consumer and
producer to all for finer grained control with the ability for the user
to implement and configure thier own policy instances.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/b5f00d23
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/b5f00d23
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/b5f00d23

Branch: refs/heads/master
Commit: b5f00d23a872e1b4cf1fb3fabb00559bffc5eae0
Parents: ff7f49f
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri May 20 15:00:06 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri May 20 15:00:06 2016 -0400

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsConnection.java |   6 +-
 .../apache/qpid/jms/JmsConnectionFactory.java   |  12 +-
 .../org/apache/qpid/jms/JmsMessageConsumer.java |  32 +--
 .../org/apache/qpid/jms/JmsMessageProducer.java |   2 +-
 .../org/apache/qpid/jms/JmsPrefetchPolicy.java  | 209 ---------------
 .../org/apache/qpid/jms/JmsPresettlePolicy.java | 260 -------------------
 .../apache/qpid/jms/JmsRedeliveryPolicy.java    |  92 -------
 .../java/org/apache/qpid/jms/JmsSession.java    |   4 +-
 .../apache/qpid/jms/meta/JmsConnectionInfo.java |  15 +-
 .../apache/qpid/jms/meta/JmsConsumerInfo.java   |   2 +-
 .../jms/policy/JmsDefaultPrefetchPolicy.java    | 227 ++++++++++++++++
 .../jms/policy/JmsDefaultPresettlePolicy.java   | 236 +++++++++++++++++
 .../jms/policy/JmsDefaultRedeliveryPolicy.java  | 100 +++++++
 .../qpid/jms/policy/JmsPrefetchPolicy.java      |  51 ++++
 .../qpid/jms/policy/JmsPresettlePolicy.java     |  64 +++++
 .../qpid/jms/policy/JmsRedeliveryPolicy.java    |  44 ++++
 .../qpid/jms/JmsConnectionFactoryTest.java      |  13 +-
 .../org/apache/qpid/jms/JmsConnectionTest.java  |   3 +-
 .../apache/qpid/jms/JmsPrefetchPolicyTest.java  |  52 ++--
 .../integration/ConsumerIntegrationTest.java    |  15 +-
 .../MessageExpirationIntegrationTest.java       |   6 +-
 .../QueueBrowserIntegrationTest.java            |  45 ++--
 .../jms/integration/SessionIntegrationTest.java |  13 +-
 .../TransactionsIntegrationTest.java            |  20 +-
 .../failover/FailoverIntegrationTest.java       |  16 +-
 .../qpid/jms/consumer/JmsQueueBrowserTest.java  |   7 +-
 .../qpid/jms/consumer/JmsZeroPrefetchTest.java  |  19 +-
 .../qpid/jms/failover/JmsFailoverTest.java      |  13 +-
 28 files changed, 860 insertions(+), 718 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5f00d23/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 5f757c7..38aa2fc 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -21,7 +21,6 @@ import java.net.URI;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
@@ -68,6 +67,9 @@ import org.apache.qpid.jms.meta.JmsSessionId;
 import org.apache.qpid.jms.meta.JmsSessionInfo;
 import org.apache.qpid.jms.meta.JmsTransactionId;
 import org.apache.qpid.jms.meta.JmsTransactionInfo;
+import org.apache.qpid.jms.policy.JmsPrefetchPolicy;
+import org.apache.qpid.jms.policy.JmsPresettlePolicy;
+import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
 import org.apache.qpid.jms.provider.AsyncResult;
 import org.apache.qpid.jms.provider.Provider;
 import org.apache.qpid.jms.provider.ProviderClosedException;
@@ -111,7 +113,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
     private final AtomicLong tempDestIdGenerator = new AtomicLong();
     private final AtomicLong transactionIdGenerator = new AtomicLong();
 
-    private final ConcurrentMap<AsyncResult, AsyncResult> requests = new ConcurrentHashMap<AsyncResult, AsyncResult>();
+    private final Map<AsyncResult, AsyncResult> requests = new ConcurrentHashMap<AsyncResult, AsyncResult>();
 
     protected JmsConnection(final String connectionId, Provider provider, IdGenerator clientIdGenerator) throws JMSException {
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5f00d23/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
index 9fb708d..f874193 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
@@ -35,6 +35,12 @@ import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
 import org.apache.qpid.jms.jndi.JNDIStorable;
 import org.apache.qpid.jms.message.JmsMessageIDBuilder;
 import org.apache.qpid.jms.meta.JmsConnectionInfo;
+import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
+import org.apache.qpid.jms.policy.JmsDefaultPresettlePolicy;
+import org.apache.qpid.jms.policy.JmsDefaultRedeliveryPolicy;
+import org.apache.qpid.jms.policy.JmsPrefetchPolicy;
+import org.apache.qpid.jms.policy.JmsPresettlePolicy;
+import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
 import org.apache.qpid.jms.provider.Provider;
 import org.apache.qpid.jms.provider.ProviderFactory;
 import org.apache.qpid.jms.util.IdGenerator;
@@ -84,9 +90,9 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact
     private String connectionIDPrefix;
     private ExceptionListener exceptionListener;
 
-    private JmsPrefetchPolicy prefetchPolicy = new JmsPrefetchPolicy();
-    private JmsRedeliveryPolicy redeliveryPolicy = new JmsRedeliveryPolicy();
-    private JmsPresettlePolicy presettlePolicy = new JmsPresettlePolicy();
+    private JmsPrefetchPolicy prefetchPolicy = new JmsDefaultPrefetchPolicy();
+    private JmsRedeliveryPolicy redeliveryPolicy = new JmsDefaultRedeliveryPolicy();
+    private JmsPresettlePolicy presettlePolicy = new JmsDefaultPresettlePolicy();
     private JmsMessageIDBuilder messageIDBuilder = JmsMessageIDBuilder.BUILTIN.DEFAULT.createBuilder();
 
     public JmsConnectionFactory() {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5f00d23/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index d63af7c..fd74ffe 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -34,6 +34,8 @@ import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
 import org.apache.qpid.jms.message.JmsMessage;
 import org.apache.qpid.jms.meta.JmsConsumerId;
 import org.apache.qpid.jms.meta.JmsConsumerInfo;
+import org.apache.qpid.jms.policy.JmsPrefetchPolicy;
+import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
 import org.apache.qpid.jms.provider.Provider;
 import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
 import org.apache.qpid.jms.provider.ProviderFuture;
@@ -84,7 +86,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
             this.messageQueue = new FifoMessageQueue();
         }
 
-        JmsPrefetchPolicy policy = connection.getPrefetchPolicy();
+        JmsPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
         JmsRedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy().copy();
 
         consumerInfo = new JmsConsumerInfo(consumerId);
@@ -95,10 +97,11 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
         consumerInfo.setAcknowledgementMode(acknowledgementMode);
         consumerInfo.setNoLocal(noLocal);
         consumerInfo.setBrowser(isBrowser());
-        consumerInfo.setPrefetchSize(getConfiguredPrefetch(destination, policy));
+        consumerInfo.setPrefetchSize(
+            prefetchPolicy.getConfiguredPrefetch(session, destination, isDurableSubscription(), isBrowser()));
         consumerInfo.setRedeliveryPolicy(redeliveryPolicy);
         consumerInfo.setLocalMessageExpiry(connection.isLocalMessageExpiry());
-        consumerInfo.setPresettle(session.getPresettlePolicy().isConsumerPresttled(destination, session));
+        consumerInfo.setPresettle(session.getPresettlePolicy().isConsumerPresttled(session, destination));
 
         session.getConnection().createResource(consumerInfo);
     }
@@ -305,8 +308,8 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
 
         JmsRedeliveryPolicy redeliveryPolicy = consumerInfo.getRedeliveryPolicy();
         return redeliveryPolicy != null &&
-               redeliveryPolicy.getMaxRedeliveries() != JmsRedeliveryPolicy.DEFAULT_MAX_REDELIVERIES &&
-               redeliveryPolicy.getMaxRedeliveries() < envelope.getRedeliveryCount();
+               redeliveryPolicy.getMaxRedeliveries(getDestination()) >= 0 &&
+               redeliveryPolicy.getMaxRedeliveries(getDestination()) < envelope.getRedeliveryCount();
     }
 
     protected void checkClosed() throws IllegalStateException {
@@ -632,25 +635,6 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
         return false;
     }
 
-    private int getConfiguredPrefetch(JmsDestination destination, JmsPrefetchPolicy policy) {
-        int prefetch = 0;
-        if (destination.isTopic()) {
-            if (isDurableSubscription()) {
-                prefetch = policy.getDurableTopicPrefetch();
-            } else {
-                prefetch = policy.getTopicPrefetch();
-            }
-        } else {
-            if (isBrowser()) {
-                prefetch = policy.getQueueBrowserPrefetch();
-            } else {
-                prefetch = policy.getQueuePrefetch();
-            }
-        }
-
-        return prefetch;
-    }
-
     private final class MessageDeliverTask implements Runnable {
         @Override
         public void run() {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5f00d23/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
index ea2d2c9..c077230 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
@@ -57,7 +57,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
         this.anonymousProducer = destination == null;
         this.producerInfo = new JmsProducerInfo(producerId);
         this.producerInfo.setDestination(destination);
-        this.producerInfo.setPresettle(session.getPresettlePolicy().isProducerPresttled(destination, session));
+        this.producerInfo.setPresettle(session.getPresettlePolicy().isProducerPresttled(session, destination));
 
         session.getConnection().createResource(producerInfo);
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5f00d23/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPrefetchPolicy.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPrefetchPolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPrefetchPolicy.java
deleted file mode 100644
index 306178d..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPrefetchPolicy.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Defines the prefetch message policies for different types of consumers
- */
-public class JmsPrefetchPolicy {
-
-    public static final int MAX_PREFETCH_SIZE = Short.MAX_VALUE;
-    public static final int DEFAULT_QUEUE_PREFETCH = 1000;
-    public static final int DEFAULT_QUEUE_BROWSER_PREFETCH = DEFAULT_QUEUE_PREFETCH;
-    public static final int DEFAULT_DURABLE_TOPIC_PREFETCH = DEFAULT_QUEUE_PREFETCH;
-    public static final int DEFAULT_TOPIC_PREFETCH = DEFAULT_QUEUE_PREFETCH;
-
-    private static final Logger LOG = LoggerFactory.getLogger(JmsPrefetchPolicy.class);
-
-    private int queuePrefetch;
-    private int queueBrowserPrefetch;
-    private int topicPrefetch;
-    private int durableTopicPrefetch;
-    private int maxPrefetchSize = MAX_PREFETCH_SIZE;
-
-    /**
-     * Initialize default prefetch policies
-     */
-    public JmsPrefetchPolicy() {
-        this.queuePrefetch = DEFAULT_QUEUE_PREFETCH;
-        this.queueBrowserPrefetch = DEFAULT_QUEUE_BROWSER_PREFETCH;
-        this.topicPrefetch = DEFAULT_TOPIC_PREFETCH;
-        this.durableTopicPrefetch = DEFAULT_DURABLE_TOPIC_PREFETCH;
-    }
-
-    /**
-     * Creates a new JmsPrefetchPolicy instance copied from the source policy.
-     *
-     * @param source
-     *      The policy instance to copy values from.
-     */
-    public JmsPrefetchPolicy(JmsPrefetchPolicy source) {
-        this.queuePrefetch = source.getQueuePrefetch();
-        this.queueBrowserPrefetch = source.getQueueBrowserPrefetch();
-        this.topicPrefetch = source.getTopicPrefetch();
-        this.durableTopicPrefetch = source.getDurableTopicPrefetch();
-        this.maxPrefetchSize = source.getMaxPrefetchSize();
-    }
-
-    /**
-     * Copy this policy into a newly allocated instance.
-     *
-     * @return a new JmsPrefetchPolicy that is a copy of this one.
-     */
-    public JmsPrefetchPolicy copy() {
-        return new JmsPrefetchPolicy(this);
-    }
-
-    /**
-     * @return Returns the durableTopicPrefetch.
-     */
-    public int getDurableTopicPrefetch() {
-        return durableTopicPrefetch;
-    }
-
-    /**
-     * Sets the durable topic prefetch value, this value is limited by the max
-     * prefetch size setting.
-     *
-     * @param durableTopicPrefetch
-     *        The durableTopicPrefetch to set.
-     */
-    public void setDurableTopicPrefetch(int durableTopicPrefetch) {
-        this.durableTopicPrefetch = getMaxPrefetchLimit(durableTopicPrefetch);
-    }
-
-    /**
-     * @return Returns the queuePrefetch.
-     */
-    public int getQueuePrefetch() {
-        return queuePrefetch;
-    }
-
-    /**
-     * @param queuePrefetch
-     *        The queuePrefetch to set.
-     */
-    public void setQueuePrefetch(int queuePrefetch) {
-        this.queuePrefetch = getMaxPrefetchLimit(queuePrefetch);
-    }
-
-    /**
-     * @return Returns the queueBrowserPrefetch.
-     */
-    public int getQueueBrowserPrefetch() {
-        return queueBrowserPrefetch;
-    }
-
-    /**
-     * @param queueBrowserPrefetch
-     *        The queueBrowserPrefetch to set.
-     */
-    public void setQueueBrowserPrefetch(int queueBrowserPrefetch) {
-        this.queueBrowserPrefetch = getMaxPrefetchLimit(queueBrowserPrefetch);
-    }
-
-    /**
-     * @return Returns the topicPrefetch.
-     */
-    public int getTopicPrefetch() {
-        return topicPrefetch;
-    }
-
-    /**
-     * @param topicPrefetch
-     *        The topicPrefetch to set.
-     */
-    public void setTopicPrefetch(int topicPrefetch) {
-        this.topicPrefetch = getMaxPrefetchLimit(topicPrefetch);
-    }
-
-    /**
-     * Gets the currently configured max prefetch size value.
-     * @return the currently configured max prefetch value.
-     */
-    public int getMaxPrefetchSize() {
-        return maxPrefetchSize;
-    }
-
-    /**
-     * Sets the maximum prefetch size value.
-     *
-     * @param maxPrefetchSize
-     *        The maximum allowed value for any of the prefetch size options.
-     */
-    public void setMaxPrefetchSize(int maxPrefetchSize) {
-        this.maxPrefetchSize = maxPrefetchSize;
-    }
-
-    /**
-     * Sets the prefetch values for all options in this policy to the set limit.  If the value
-     * given is larger than the max prefetch value of this policy the new limit will be capped
-     * at the max prefetch value.
-     *
-     * @param prefetch
-     *      The prefetch value to apply to all prefetch limits.
-     */
-    public void setAll(int prefetch) {
-        this.durableTopicPrefetch = getMaxPrefetchLimit(prefetch);
-        this.queueBrowserPrefetch = getMaxPrefetchLimit(prefetch);
-        this.queuePrefetch = getMaxPrefetchLimit(prefetch);
-        this.topicPrefetch = getMaxPrefetchLimit(prefetch);
-    }
-
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + durableTopicPrefetch;
-        result = prime * result + maxPrefetchSize;
-        result = prime * result + queueBrowserPrefetch;
-        result = prime * result + queuePrefetch;
-        result = prime * result + topicPrefetch;
-        return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj) {
-            return true;
-        }
-        if (obj == null) {
-            return false;
-        }
-        if (getClass() != obj.getClass()) {
-            return false;
-        }
-
-        JmsPrefetchPolicy other = (JmsPrefetchPolicy) obj;
-
-        return this.queuePrefetch == other.queuePrefetch &&
-               this.queueBrowserPrefetch == other.queueBrowserPrefetch &&
-               this.topicPrefetch == other.topicPrefetch &&
-               this.durableTopicPrefetch == other.durableTopicPrefetch;
-    }
-
-    private int getMaxPrefetchLimit(int value) {
-        int result = Math.min(value, maxPrefetchSize);
-        if (result < value) {
-            LOG.warn("maximum prefetch limit has been reset from " + value + " to " + MAX_PREFETCH_SIZE);
-        }
-        return result;
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5f00d23/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPresettlePolicy.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPresettlePolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPresettlePolicy.java
deleted file mode 100644
index 30ab9ea..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPresettlePolicy.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms;
-
-/**
- * Policy object that allows for configuration of options that affect when
- * a JMS MessageProducer will result in AMQP presettled message sends.
- */
-public class JmsPresettlePolicy {
-
-    private boolean presettleAll;
-
-    private boolean presettleProducers;
-    private boolean presettleTopicProducers;
-    private boolean presettleQueueProducers;
-    private boolean presettleTransactedProducers;
-
-    private boolean presettleConsumers;
-    private boolean presettleTopicConsumers;
-    private boolean presettleQueueConsumers;
-
-    public JmsPresettlePolicy() {
-    }
-
-    public JmsPresettlePolicy(JmsPresettlePolicy source) {
-        this.presettleAll = source.presettleAll;
-        this.presettleProducers = source.presettleProducers;
-        this.presettleTopicProducers = source.presettleTopicProducers;
-        this.presettleQueueProducers = source.presettleQueueProducers;
-        this.presettleTransactedProducers = source.presettleTransactedProducers;
-        this.presettleConsumers = source.presettleConsumers;
-        this.presettleTopicConsumers = source.presettleTopicConsumers;
-        this.presettleQueueConsumers = source.presettleQueueConsumers;
-    }
-
-    public JmsPresettlePolicy copy() {
-        return new JmsPresettlePolicy(this);
-    }
-
-    /**
-     * @return the presettleAll setting for this policy
-     */
-    public boolean isPresettleAll() {
-        return presettleAll;
-    }
-
-    /**
-     * Sets the presettle all sends option.  When true all MessageProducers
-     * will send their messages presettled.
-     *
-     * @param presettleAll
-     *      the presettleAll value to apply.
-     */
-    public void setPresettleAll(boolean presettleAll) {
-        this.presettleAll = presettleAll;
-    }
-
-    /**
-     * @return the presettleProducers setting for this policy.
-     */
-    public boolean isPresettleProducers() {
-        return presettleProducers;
-    }
-
-    /**
-     * Sets the the presettle all sends option.  When true all MessageProducers that
-     * are created will send their messages as settled.
-     *
-     * @param presettleProducers
-     *      the presettleProducers value to apply.
-     */
-    public void setPresettleProducers(boolean presettleProducers) {
-        this.presettleProducers = presettleProducers;
-    }
-
-    /**
-     * @return the presettleTopicProducers setting for this policy
-     */
-    public boolean isPresettleTopicProducers() {
-        return presettleTopicProducers;
-    }
-
-    /**
-     * Sets the presettle Topic sends option.  When true any MessageProducer that
-     * is created that sends to a Topic will send its messages presettled, and any
-     * anonymous MessageProducer will send Messages that are sent to a Topic as
-     * presettled as well.
-     *
-     * @param presettleTopicProducers
-     *      the presettleTopicProducers value to apply.
-     */
-    public void setPresettleTopicProducers(boolean presettleTopicProducers) {
-        this.presettleTopicProducers = presettleTopicProducers;
-    }
-
-    /**
-     * @return the presettleQueueSends setting for this policy
-     */
-    public boolean isPresettleQueueProducers() {
-        return presettleQueueProducers;
-    }
-
-    /**
-     * Sets the presettle Queue sends option.  When true any MessageProducer that
-     * is created that sends to a Queue will send its messages presettled, and any
-     * anonymous MessageProducer will send Messages that are sent to a Queue as
-     * presettled as well.
-     *
-     * @param presettleQueueProducers
-     *      the presettleQueueSends value to apply.
-     */
-    public void setPresettleQueueProducers(boolean presettleQueueProducers) {
-        this.presettleQueueProducers = presettleQueueProducers;
-    }
-
-    /**
-     * @return the presettleTransactedSends setting for this policy
-     */
-    public boolean isPresettleTransactedProducers() {
-        return presettleTransactedProducers;
-    }
-
-    /**
-     * Sets the presettle in transactions option.  When true any MessageProducer that is
-     * operating inside of a transacted session will send its messages presettled.
-     *
-     * @param presettleTransactedProducers the presettleTransactedSends to set
-     */
-    public void setPresettleTransactedProducers(boolean presettleTransactedProducers) {
-        this.presettleTransactedProducers = presettleTransactedProducers;
-    }
-
-    /**
-     * Determines when a producer will send message presettled.
-     * <p>
-     * Called when the a producer is being created to determine whether the producer will
-     * be configured to send all its message as presettled or not.
-     * <p>
-     * For an anonymous producer this method is called on each send to allow the policy to
-     * be applied to the target destination that the message will be sent to.
-     *
-     * @param destination
-     *      the destination that the producer will be sending to.
-     * @param session
-     *      the session that owns the producer.
-     *
-     * @return true if the producer should send presettled.
-     */
-    public boolean isProducerPresttled(JmsDestination destination, JmsSession session) {
-
-        if (presettleAll || presettleProducers) {
-            return true;
-        } else if (session.isTransacted() && presettleTransactedProducers) {
-            return true;
-        } else if (destination != null && destination.isQueue() && presettleQueueProducers) {
-            return true;
-        } else if (destination != null && destination.isTopic() && presettleTopicProducers) {
-            return true;
-        }
-
-        return false;
-    }
-
-    /**
-     * @return the presettleConsumers configuration value for this policy.
-     */
-    public boolean isPresettleConsumers() {
-        return presettleConsumers;
-    }
-
-    /**
-     * The presettle all consumers value to apply.  When true all MessageConsumer
-     * instances created will indicate that presettled messages are requested.
-     *
-     * @param presettleConsumers
-     *      the presettleConsumers value to apply to this policy.
-     */
-    public void setPresettleConsumers(boolean presettleConsumers) {
-        this.presettleConsumers = presettleConsumers;
-    }
-
-    /**
-     * @return the presettleTopicConsumers setting for this policy.
-     */
-    public boolean isPresettleTopicConsumers() {
-        return presettleTopicConsumers;
-    }
-
-    /**
-     * The presettle Topic consumers value to apply.  When true any MessageConsumer for
-     * a Topic destination will indicate that presettled messages are requested.
-     *
-     * @param presettleTopicConsumers
-     *      the presettleTopicConsumers value to apply to this policy.
-     */
-    public void setPresettleTopicConsumers(boolean presettleTopicConsumers) {
-        this.presettleTopicConsumers = presettleTopicConsumers;
-    }
-
-    /**
-     * @return the presettleQueueConsumers setting for this policy.
-     */
-    public boolean isPresettleQueueConsumers() {
-        return presettleQueueConsumers;
-    }
-
-    /**
-     * The presettle Queue consumers value to apply.  When true any MessageConsumer for
-     * a Queue destination will indicate that presettled messages are requested.
-     *
-     * @param presettleQueueConsumers
-     *      the presettleQueueConsumers value to apply to this policy.
-     */
-    public void setPresettleQueueConsumers(boolean presettleQueueConsumers) {
-        this.presettleQueueConsumers = presettleQueueConsumers;
-    }
-
-    /**
-     * Determines when a consumer will be created with the settlement mode set to presettled.
-     * <p>
-     * Called when the a consumer is being created to determine whether the consumer will
-     * be configured to request that the remote sends it message that are presettled.
-     * <p>
-     *
-     * @param destination
-     *      the destination that the consumer will be listening to.
-     * @param session
-     *      the session that owns the consumer being created.
-     *
-     * @return true if the producer should send presettled.
-     */
-    public boolean isConsumerPresttled(JmsDestination destination, JmsSession session) {
-
-        if (session.isTransacted()) {
-            return false;
-        } else if (presettleAll || presettleConsumers) {
-            return true;
-        } else if (destination != null && destination.isQueue() && presettleQueueConsumers) {
-            return true;
-        } else if (destination != null && destination.isTopic() && presettleTopicConsumers) {
-            return true;
-        }
-
-        return false;
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5f00d23/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsRedeliveryPolicy.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsRedeliveryPolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsRedeliveryPolicy.java
deleted file mode 100644
index c57e238..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsRedeliveryPolicy.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms;
-
-/**
- * Defines the policy used to manage redelivered and recovered Messages.
- */
-public class JmsRedeliveryPolicy {
-
-    public static final int DEFAULT_MAX_REDELIVERIES = -1;
-
-    private int maxRedeliveries;
-
-    public JmsRedeliveryPolicy() {
-        maxRedeliveries = DEFAULT_MAX_REDELIVERIES;
-    }
-
-    public JmsRedeliveryPolicy(JmsRedeliveryPolicy source) {
-        maxRedeliveries = source.maxRedeliveries;
-    }
-
-    public JmsRedeliveryPolicy copy() {
-        return new JmsRedeliveryPolicy(this);
-    }
-
-    /**
-     * Returns the configured maximum redeliveries that a message will be
-     * allowed to have before it is rejected by this client.
-     *
-     * @return the maxRedeliveries
-     *         the maximum number of redeliveries allowed before a message is rejected.
-     */
-    public int getMaxRedeliveries() {
-        return maxRedeliveries;
-    }
-
-    /**
-     * Configures the maximum number of time a message can be redelivered before it
-     * will be rejected by this client.
-     *
-     * The default value of (-1) disables max redelivery processing.
-     *
-     * @param maxRedeliveries the maxRedeliveries to set
-     */
-    public void setMaxRedeliveries(int maxRedeliveries) {
-        this.maxRedeliveries = maxRedeliveries;
-    }
-
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + maxRedeliveries;
-        return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj) {
-            return true;
-        }
-
-        if (obj == null) {
-            return false;
-        }
-
-        if (getClass() != obj.getClass()) {
-            return false;
-        }
-
-        JmsRedeliveryPolicy other = (JmsRedeliveryPolicy) obj;
-        if (maxRedeliveries != other.maxRedeliveries) {
-            return false;
-        }
-
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5f00d23/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index ea57503..66d9539 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -69,6 +69,8 @@ import org.apache.qpid.jms.meta.JmsProducerId;
 import org.apache.qpid.jms.meta.JmsProducerInfo;
 import org.apache.qpid.jms.meta.JmsSessionId;
 import org.apache.qpid.jms.meta.JmsSessionInfo;
+import org.apache.qpid.jms.policy.JmsPrefetchPolicy;
+import org.apache.qpid.jms.policy.JmsPresettlePolicy;
 import org.apache.qpid.jms.provider.Provider;
 import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
 import org.apache.qpid.jms.provider.ProviderFuture;
@@ -711,7 +713,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
             envelope.setDispatchId(messageSequence);
 
             if (producer.isAnonymous()) {
-                envelope.setPresettle(presettlePolicy.isProducerPresttled(destination, this));
+                envelope.setPresettle(presettlePolicy.isProducerPresttled(this, destination));
             }
 
             transactionContext.send(connection, envelope);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5f00d23/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
index 3af306d..14661ca 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
@@ -19,10 +19,13 @@ package org.apache.qpid.jms.meta;
 import java.net.URI;
 import java.nio.charset.Charset;
 
-import org.apache.qpid.jms.JmsPrefetchPolicy;
-import org.apache.qpid.jms.JmsPresettlePolicy;
-import org.apache.qpid.jms.JmsRedeliveryPolicy;
 import org.apache.qpid.jms.message.JmsMessageIDBuilder;
+import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
+import org.apache.qpid.jms.policy.JmsDefaultPresettlePolicy;
+import org.apache.qpid.jms.policy.JmsDefaultRedeliveryPolicy;
+import org.apache.qpid.jms.policy.JmsPrefetchPolicy;
+import org.apache.qpid.jms.policy.JmsPresettlePolicy;
+import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
 
 /**
  * Meta object that contains the JmsConnection identification and configuration
@@ -59,9 +62,9 @@ public final class JmsConnectionInfo implements JmsResource, Comparable<JmsConne
     private String queuePrefix = null;
     private String topicPrefix = null;
 
-    private JmsPrefetchPolicy prefetchPolicy = new JmsPrefetchPolicy();
-    private JmsRedeliveryPolicy redeliveryPolicy = new JmsRedeliveryPolicy();
-    private JmsPresettlePolicy presettlePolicy = new JmsPresettlePolicy();
+    private JmsPrefetchPolicy prefetchPolicy = new JmsDefaultPrefetchPolicy();
+    private JmsRedeliveryPolicy redeliveryPolicy = new JmsDefaultRedeliveryPolicy();
+    private JmsPresettlePolicy presettlePolicy = new JmsDefaultPresettlePolicy();
     private JmsMessageIDBuilder messageIDBuilder = JmsMessageIDBuilder.BUILTIN.DEFAULT.createBuilder();
 
     private volatile byte[] encodedUserId;

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5f00d23/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
index f5b791a..ef8a939 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
@@ -17,7 +17,7 @@
 package org.apache.qpid.jms.meta;
 
 import org.apache.qpid.jms.JmsDestination;
-import org.apache.qpid.jms.JmsRedeliveryPolicy;
+import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
 
 public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsumerInfo> {
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5f00d23/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultPrefetchPolicy.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultPrefetchPolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultPrefetchPolicy.java
new file mode 100644
index 0000000..c01da2c
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultPrefetchPolicy.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.policy;
+
+import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.JmsSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Defines the prefetch message policies for different types of consumers
+ */
+public class JmsDefaultPrefetchPolicy implements JmsPrefetchPolicy {
+
+    public static final int MAX_PREFETCH_SIZE = Short.MAX_VALUE;
+    public static final int DEFAULT_QUEUE_PREFETCH = 1000;
+    public static final int DEFAULT_QUEUE_BROWSER_PREFETCH = DEFAULT_QUEUE_PREFETCH;
+    public static final int DEFAULT_DURABLE_TOPIC_PREFETCH = DEFAULT_QUEUE_PREFETCH;
+    public static final int DEFAULT_TOPIC_PREFETCH = DEFAULT_QUEUE_PREFETCH;
+
+    private static final Logger LOG = LoggerFactory.getLogger(JmsDefaultPrefetchPolicy.class);
+
+    private int queuePrefetch;
+    private int queueBrowserPrefetch;
+    private int topicPrefetch;
+    private int durableTopicPrefetch;
+    private int maxPrefetchSize = MAX_PREFETCH_SIZE;
+
+    /**
+     * Initialize default prefetch policies
+     */
+    public JmsDefaultPrefetchPolicy() {
+        this.queuePrefetch = DEFAULT_QUEUE_PREFETCH;
+        this.queueBrowserPrefetch = DEFAULT_QUEUE_BROWSER_PREFETCH;
+        this.topicPrefetch = DEFAULT_TOPIC_PREFETCH;
+        this.durableTopicPrefetch = DEFAULT_DURABLE_TOPIC_PREFETCH;
+    }
+
+    /**
+     * Creates a new JmsPrefetchPolicy instance copied from the source policy.
+     *
+     * @param source
+     *      The policy instance to copy values from.
+     */
+    public JmsDefaultPrefetchPolicy(JmsDefaultPrefetchPolicy source) {
+        this.queuePrefetch = source.getQueuePrefetch();
+        this.queueBrowserPrefetch = source.getQueueBrowserPrefetch();
+        this.topicPrefetch = source.getTopicPrefetch();
+        this.durableTopicPrefetch = source.getDurableTopicPrefetch();
+        this.maxPrefetchSize = source.getMaxPrefetchSize();
+    }
+
+    @Override
+    public JmsDefaultPrefetchPolicy copy() {
+        return new JmsDefaultPrefetchPolicy(this);
+    }
+
+    @Override
+    public int getConfiguredPrefetch(JmsSession session, JmsDestination destination, boolean durable, boolean browser) {
+        int prefetch = 0;
+        if (destination.isTopic()) {
+            if (durable) {
+                prefetch = getDurableTopicPrefetch();
+            } else {
+                prefetch = getTopicPrefetch();
+            }
+        } else {
+            if (browser) {
+                prefetch = getQueueBrowserPrefetch();
+            } else {
+                prefetch = getQueuePrefetch();
+            }
+        }
+
+        return prefetch;
+    }
+
+    /**
+     * @return Returns the durableTopicPrefetch.
+     */
+    public int getDurableTopicPrefetch() {
+        return durableTopicPrefetch;
+    }
+
+    /**
+     * Sets the durable topic prefetch value, this value is limited by the max
+     * prefetch size setting.
+     *
+     * @param durableTopicPrefetch
+     *        The durableTopicPrefetch to set.
+     */
+    public void setDurableTopicPrefetch(int durableTopicPrefetch) {
+        this.durableTopicPrefetch = getMaxPrefetchLimit(durableTopicPrefetch);
+    }
+
+    /**
+     * @return Returns the queuePrefetch.
+     */
+    public int getQueuePrefetch() {
+        return queuePrefetch;
+    }
+
+    /**
+     * @param queuePrefetch
+     *        The queuePrefetch to set.
+     */
+    public void setQueuePrefetch(int queuePrefetch) {
+        this.queuePrefetch = getMaxPrefetchLimit(queuePrefetch);
+    }
+
+    /**
+     * @return Returns the queueBrowserPrefetch.
+     */
+    public int getQueueBrowserPrefetch() {
+        return queueBrowserPrefetch;
+    }
+
+    /**
+     * @param queueBrowserPrefetch
+     *        The queueBrowserPrefetch to set.
+     */
+    public void setQueueBrowserPrefetch(int queueBrowserPrefetch) {
+        this.queueBrowserPrefetch = getMaxPrefetchLimit(queueBrowserPrefetch);
+    }
+
+    /**
+     * @return Returns the topicPrefetch.
+     */
+    public int getTopicPrefetch() {
+        return topicPrefetch;
+    }
+
+    /**
+     * @param topicPrefetch
+     *        The topicPrefetch to set.
+     */
+    public void setTopicPrefetch(int topicPrefetch) {
+        this.topicPrefetch = getMaxPrefetchLimit(topicPrefetch);
+    }
+
+    /**
+     * Gets the currently configured max prefetch size value.
+     * @return the currently configured max prefetch value.
+     */
+    public int getMaxPrefetchSize() {
+        return maxPrefetchSize;
+    }
+
+    /**
+     * Sets the maximum prefetch size value.
+     *
+     * @param maxPrefetchSize
+     *        The maximum allowed value for any of the prefetch size options.
+     */
+    public void setMaxPrefetchSize(int maxPrefetchSize) {
+        this.maxPrefetchSize = maxPrefetchSize;
+    }
+
+    /**
+     * Sets the prefetch values for all options in this policy to the set limit.  If the value
+     * given is larger than the max prefetch value of this policy the new limit will be capped
+     * at the max prefetch value.
+     *
+     * @param prefetch
+     *      The prefetch value to apply to all prefetch limits.
+     */
+    public void setAll(int prefetch) {
+        this.durableTopicPrefetch = getMaxPrefetchLimit(prefetch);
+        this.queueBrowserPrefetch = getMaxPrefetchLimit(prefetch);
+        this.queuePrefetch = getMaxPrefetchLimit(prefetch);
+        this.topicPrefetch = getMaxPrefetchLimit(prefetch);
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + durableTopicPrefetch;
+        result = prime * result + maxPrefetchSize;
+        result = prime * result + queueBrowserPrefetch;
+        result = prime * result + queuePrefetch;
+        result = prime * result + topicPrefetch;
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+
+        JmsDefaultPrefetchPolicy other = (JmsDefaultPrefetchPolicy) obj;
+
+        return this.queuePrefetch == other.queuePrefetch &&
+               this.queueBrowserPrefetch == other.queueBrowserPrefetch &&
+               this.topicPrefetch == other.topicPrefetch &&
+               this.durableTopicPrefetch == other.durableTopicPrefetch;
+    }
+
+    private int getMaxPrefetchLimit(int value) {
+        int result = Math.min(value, maxPrefetchSize);
+        if (result < value) {
+            LOG.warn("maximum prefetch limit has been reset from " + value + " to " + MAX_PREFETCH_SIZE);
+        }
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5f00d23/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultPresettlePolicy.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultPresettlePolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultPresettlePolicy.java
new file mode 100644
index 0000000..95ca57c
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultPresettlePolicy.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.policy;
+
+import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.JmsSession;
+
+/**
+ * Policy object that allows for configuration of options that affect when
+ * a JMS MessageProducer will result in AMQP presettled message sends.
+ */
+public class JmsDefaultPresettlePolicy implements JmsPresettlePolicy {
+
+    private boolean presettleAll;
+
+    private boolean presettleProducers;
+    private boolean presettleTopicProducers;
+    private boolean presettleQueueProducers;
+    private boolean presettleTransactedProducers;
+
+    private boolean presettleConsumers;
+    private boolean presettleTopicConsumers;
+    private boolean presettleQueueConsumers;
+
+    public JmsDefaultPresettlePolicy() {
+    }
+
+    public JmsDefaultPresettlePolicy(JmsDefaultPresettlePolicy source) {
+        this.presettleAll = source.presettleAll;
+        this.presettleProducers = source.presettleProducers;
+        this.presettleTopicProducers = source.presettleTopicProducers;
+        this.presettleQueueProducers = source.presettleQueueProducers;
+        this.presettleTransactedProducers = source.presettleTransactedProducers;
+        this.presettleConsumers = source.presettleConsumers;
+        this.presettleTopicConsumers = source.presettleTopicConsumers;
+        this.presettleQueueConsumers = source.presettleQueueConsumers;
+    }
+
+    @Override
+    public JmsPresettlePolicy copy() {
+        return new JmsDefaultPresettlePolicy(this);
+    }
+
+    @Override
+    public boolean isConsumerPresttled(JmsSession session, JmsDestination destination) {
+
+        if (session.isTransacted()) {
+            return false;
+        } else if (presettleAll || presettleConsumers) {
+            return true;
+        } else if (destination != null && destination.isQueue() && presettleQueueConsumers) {
+            return true;
+        } else if (destination != null && destination.isTopic() && presettleTopicConsumers) {
+            return true;
+        }
+
+        return false;
+    }
+
+    @Override
+    public boolean isProducerPresttled(JmsSession session, JmsDestination destination) {
+
+        if (presettleAll || presettleProducers) {
+            return true;
+        } else if (session.isTransacted() && presettleTransactedProducers) {
+            return true;
+        } else if (destination != null && destination.isQueue() && presettleQueueProducers) {
+            return true;
+        } else if (destination != null && destination.isTopic() && presettleTopicProducers) {
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * @return the presettleAll setting for this policy
+     */
+    public boolean isPresettleAll() {
+        return presettleAll;
+    }
+
+    /**
+     * Sets the presettle all sends option.  When true all MessageProducers
+     * will send their messages presettled.
+     *
+     * @param presettleAll
+     *      the presettleAll value to apply.
+     */
+    public void setPresettleAll(boolean presettleAll) {
+        this.presettleAll = presettleAll;
+    }
+
+    /**
+     * @return the presettleProducers setting for this policy.
+     */
+    public boolean isPresettleProducers() {
+        return presettleProducers;
+    }
+
+    /**
+     * Sets the the presettle all sends option.  When true all MessageProducers that
+     * are created will send their messages as settled.
+     *
+     * @param presettleProducers
+     *      the presettleProducers value to apply.
+     */
+    public void setPresettleProducers(boolean presettleProducers) {
+        this.presettleProducers = presettleProducers;
+    }
+
+    /**
+     * @return the presettleTopicProducers setting for this policy
+     */
+    public boolean isPresettleTopicProducers() {
+        return presettleTopicProducers;
+    }
+
+    /**
+     * Sets the presettle Topic sends option.  When true any MessageProducer that
+     * is created that sends to a Topic will send its messages presettled, and any
+     * anonymous MessageProducer will send Messages that are sent to a Topic as
+     * presettled as well.
+     *
+     * @param presettleTopicProducers
+     *      the presettleTopicProducers value to apply.
+     */
+    public void setPresettleTopicProducers(boolean presettleTopicProducers) {
+        this.presettleTopicProducers = presettleTopicProducers;
+    }
+
+    /**
+     * @return the presettleQueueSends setting for this policy
+     */
+    public boolean isPresettleQueueProducers() {
+        return presettleQueueProducers;
+    }
+
+    /**
+     * Sets the presettle Queue sends option.  When true any MessageProducer that
+     * is created that sends to a Queue will send its messages presettled, and any
+     * anonymous MessageProducer will send Messages that are sent to a Queue as
+     * presettled as well.
+     *
+     * @param presettleQueueProducers
+     *      the presettleQueueSends value to apply.
+     */
+    public void setPresettleQueueProducers(boolean presettleQueueProducers) {
+        this.presettleQueueProducers = presettleQueueProducers;
+    }
+
+    /**
+     * @return the presettleTransactedSends setting for this policy
+     */
+    public boolean isPresettleTransactedProducers() {
+        return presettleTransactedProducers;
+    }
+
+    /**
+     * Sets the presettle in transactions option.  When true any MessageProducer that is
+     * operating inside of a transacted session will send its messages presettled.
+     *
+     * @param presettleTransactedProducers the presettleTransactedSends to set
+     */
+    public void setPresettleTransactedProducers(boolean presettleTransactedProducers) {
+        this.presettleTransactedProducers = presettleTransactedProducers;
+    }
+
+    /**
+     * @return the presettleConsumers configuration value for this policy.
+     */
+    public boolean isPresettleConsumers() {
+        return presettleConsumers;
+    }
+
+    /**
+     * The presettle all consumers value to apply.  When true all MessageConsumer
+     * instances created will indicate that presettled messages are requested.
+     *
+     * @param presettleConsumers
+     *      the presettleConsumers value to apply to this policy.
+     */
+    public void setPresettleConsumers(boolean presettleConsumers) {
+        this.presettleConsumers = presettleConsumers;
+    }
+
+    /**
+     * @return the presettleTopicConsumers setting for this policy.
+     */
+    public boolean isPresettleTopicConsumers() {
+        return presettleTopicConsumers;
+    }
+
+    /**
+     * The presettle Topic consumers value to apply.  When true any MessageConsumer for
+     * a Topic destination will indicate that presettled messages are requested.
+     *
+     * @param presettleTopicConsumers
+     *      the presettleTopicConsumers value to apply to this policy.
+     */
+    public void setPresettleTopicConsumers(boolean presettleTopicConsumers) {
+        this.presettleTopicConsumers = presettleTopicConsumers;
+    }
+
+    /**
+     * @return the presettleQueueConsumers setting for this policy.
+     */
+    public boolean isPresettleQueueConsumers() {
+        return presettleQueueConsumers;
+    }
+
+    /**
+     * The presettle Queue consumers value to apply.  When true any MessageConsumer for
+     * a Queue destination will indicate that presettled messages are requested.
+     *
+     * @param presettleQueueConsumers
+     *      the presettleQueueConsumers value to apply to this policy.
+     */
+    public void setPresettleQueueConsumers(boolean presettleQueueConsumers) {
+        this.presettleQueueConsumers = presettleQueueConsumers;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5f00d23/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultRedeliveryPolicy.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultRedeliveryPolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultRedeliveryPolicy.java
new file mode 100644
index 0000000..bf885ea
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultRedeliveryPolicy.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.policy;
+
+import org.apache.qpid.jms.JmsDestination;
+
+/**
+ * Defines the policy used to manage redelivered and recovered Messages.
+ */
+public class JmsDefaultRedeliveryPolicy implements JmsRedeliveryPolicy {
+
+    public static final int DEFAULT_MAX_REDELIVERIES = -1;
+
+    private int maxRedeliveries;
+
+    public JmsDefaultRedeliveryPolicy() {
+        maxRedeliveries = DEFAULT_MAX_REDELIVERIES;
+    }
+
+    public JmsDefaultRedeliveryPolicy(JmsDefaultRedeliveryPolicy source) {
+        maxRedeliveries = source.maxRedeliveries;
+    }
+
+    @Override
+    public JmsRedeliveryPolicy copy() {
+        return new JmsDefaultRedeliveryPolicy(this);
+    }
+
+    @Override
+    public int getMaxRedeliveries(JmsDestination destination) {
+        return maxRedeliveries;
+    }
+
+    /**
+     * Returns the configured maximum redeliveries that a message will be
+     * allowed to have before it is rejected by this client.
+     *
+     * @return the maxRedeliveries
+     *         the maximum number of redeliveries allowed before a message is rejected.
+     */
+    public int getMaxRedeliveries() {
+        return maxRedeliveries;
+    }
+
+    /**
+     * Configures the maximum number of time a message can be redelivered before it
+     * will be rejected by this client.
+     *
+     * The default value of (-1) disables max redelivery processing.
+     *
+     * @param maxRedeliveries the maxRedeliveries to set
+     */
+    public void setMaxRedeliveries(int maxRedeliveries) {
+        this.maxRedeliveries = maxRedeliveries;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + maxRedeliveries;
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+
+        if (obj == null) {
+            return false;
+        }
+
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+
+        JmsDefaultRedeliveryPolicy other = (JmsDefaultRedeliveryPolicy) obj;
+        if (maxRedeliveries != other.maxRedeliveries) {
+            return false;
+        }
+
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5f00d23/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsPrefetchPolicy.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsPrefetchPolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsPrefetchPolicy.java
new file mode 100644
index 0000000..efc49de
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsPrefetchPolicy.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.policy;
+
+import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.JmsSession;
+
+/**
+ * Interface for all Prefetch Policy implementations.  Allows for configuration of
+ * MessageConsumer prefetch during creation.
+ */
+public interface JmsPrefetchPolicy {
+
+    /**
+     * Copy this policy into a newly allocated instance.
+     *
+     * @return a new JmsPrefetchPolicy that is a copy of this one.
+     */
+    JmsPrefetchPolicy copy();
+
+    /**
+     * Returns the prefetch value to use when creating a MessageConsumer instance.
+     *
+     * @param session
+     *      the Session that own the MessageConsumer being created.
+     * @param destination
+     *      the Destination that the consumer will be subscribed to.
+     * @param durable
+     *      indicates if the subscription being created is a durable subscription (Topics only).
+     * @param browser
+     *      indicates if the subscription being created is a message browser (Queues only).
+     *
+     * @return the prefetch value to assign the MessageConsumer being created.
+     */
+    int getConfiguredPrefetch(JmsSession session, JmsDestination destination, boolean durable, boolean browser);
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5f00d23/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsPresettlePolicy.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsPresettlePolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsPresettlePolicy.java
new file mode 100644
index 0000000..d2a3abe
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsPresettlePolicy.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.policy;
+
+import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.JmsSession;
+
+/**
+ * Interface for building policy objects that control when a MessageProducer or
+ * MessageConsumer instance will operate in presettled mode.
+ */
+public interface JmsPresettlePolicy {
+
+    JmsPresettlePolicy copy();
+
+    /**
+     * Determines when a producer will send message presettled.
+     * <p>
+     * Called when the a producer is being created to determine whether the producer will
+     * be configured to send all its message as presettled or not.
+     * <p>
+     * For an anonymous producer this method is called on each send to allow the policy to
+     * be applied to the target destination that the message will be sent to.
+     *
+     * @param session
+     *      the session that owns the producer.
+     * @param destination
+     *      the destination that the producer will be sending to.
+     *
+     * @return true if the producer should send presettled.
+     */
+    boolean isProducerPresttled(JmsSession session, JmsDestination destination);
+
+    /**
+     * Determines when a consumer will be created with the settlement mode set to presettled.
+     * <p>
+     * Called when the a consumer is being created to determine whether the consumer will
+     * be configured to request that the remote sends it message that are presettled.
+     * <p>
+     *
+     * @param session
+     *      the session that owns the consumer being created.
+     * @param destination
+     *      the destination that the consumer will be listening to.
+     *
+     * @return true if the producer should send presettled.
+     */
+    boolean isConsumerPresttled(JmsSession session, JmsDestination destination);
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5f00d23/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsRedeliveryPolicy.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsRedeliveryPolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsRedeliveryPolicy.java
new file mode 100644
index 0000000..24f0eec
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsRedeliveryPolicy.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.policy;
+
+import org.apache.qpid.jms.JmsDestination;
+
+/**
+ * Interface for a Redelivery Policy object used to determine how many times a Message
+ * can be redelivered by the client before being dropped.
+ */
+public interface JmsRedeliveryPolicy {
+
+    JmsRedeliveryPolicy copy();
+
+    /**
+     * Returns the configured maximum redeliveries that a message will be
+     * allowed to have before it is rejected by this client for a given destination.
+     * <p>
+     * A return value of less than zero is treated as if there is no maximum value
+     * set.
+     *
+     * @param destination
+     *      the destination that the subscription is redelivering from.
+     *
+     * @return the maxRedeliveries
+     *         the maximum number of redeliveries allowed before a message is rejected.
+     */
+    int getMaxRedeliveries(JmsDestination destination);
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5f00d23/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionFactoryTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionFactoryTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionFactoryTest.java
index 965e990..e5083cf 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionFactoryTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionFactoryTest.java
@@ -37,6 +37,7 @@ import javax.jms.Connection;
 import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 
+import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -84,7 +85,7 @@ public class JmsConnectionFactoryTest extends QpidJmsTestCase {
         factory.setConnectTimeout(TimeUnit.SECONDS.toMillis(30));
         factory.setCloseTimeout(TimeUnit.SECONDS.toMillis(45));
 
-        factory.getPrefetchPolicy().setAll(1);
+        ((JmsDefaultPrefetchPolicy) factory.getPrefetchPolicy()).setAll(1);
 
         JmsConnection connection = (JmsConnection) factory.createConnection();
         assertNotNull(connection);
@@ -103,10 +104,12 @@ public class JmsConnectionFactoryTest extends QpidJmsTestCase {
         assertEquals(TimeUnit.SECONDS.toMillis(30), connection.getConnectTimeout());
         assertEquals(TimeUnit.SECONDS.toMillis(45), connection.getCloseTimeout());
 
-        assertEquals(1, connection.getPrefetchPolicy().getTopicPrefetch());
-        assertEquals(1, connection.getPrefetchPolicy().getQueuePrefetch());
-        assertEquals(1, connection.getPrefetchPolicy().getQueueBrowserPrefetch());
-        assertEquals(1, connection.getPrefetchPolicy().getDurableTopicPrefetch());
+        JmsDefaultPrefetchPolicy prefetchPolicy = (JmsDefaultPrefetchPolicy) connection.getPrefetchPolicy();
+
+        assertEquals(1, prefetchPolicy.getTopicPrefetch());
+        assertEquals(1, prefetchPolicy.getQueuePrefetch());
+        assertEquals(1, prefetchPolicy.getQueueBrowserPrefetch());
+        assertEquals(1, prefetchPolicy.getDurableTopicPrefetch());
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5f00d23/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
index 1ac4c3d..3f5b84b 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
@@ -35,6 +35,7 @@ import javax.jms.Session;
 import javax.jms.TemporaryQueue;
 import javax.jms.TemporaryTopic;
 
+import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
 import org.apache.qpid.jms.provider.mock.MockProvider;
 import org.apache.qpid.jms.provider.mock.MockProviderFactory;
 import org.apache.qpid.jms.util.IdGenerator;
@@ -98,7 +99,7 @@ public class JmsConnectionTest {
     public void testReplacePrefetchPolicy() throws JMSException {
         connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
 
-        JmsPrefetchPolicy newPolicy = new JmsPrefetchPolicy();
+        JmsDefaultPrefetchPolicy newPolicy = new JmsDefaultPrefetchPolicy();
         newPolicy.setAll(1);
 
         assertNotSame(newPolicy, connection.getPrefetchPolicy());

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5f00d23/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsPrefetchPolicyTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsPrefetchPolicyTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsPrefetchPolicyTest.java
index f58472c..79c8c74 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsPrefetchPolicyTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsPrefetchPolicyTest.java
@@ -20,6 +20,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
+import org.apache.qpid.jms.policy.JmsPrefetchPolicy;
 import org.junit.Test;
 
 /**
@@ -29,8 +31,8 @@ public class JmsPrefetchPolicyTest {
 
     @Test
     public void testHashCode() {
-        JmsPrefetchPolicy policy1 = new JmsPrefetchPolicy();
-        JmsPrefetchPolicy policy2 = new JmsPrefetchPolicy();
+        JmsPrefetchPolicy policy1 = new JmsDefaultPrefetchPolicy();
+        JmsPrefetchPolicy policy2 = new JmsDefaultPrefetchPolicy();
 
         assertTrue(policy1.hashCode() != 0);
         assertEquals(policy1.hashCode(), policy1.hashCode());
@@ -39,25 +41,25 @@ public class JmsPrefetchPolicyTest {
 
     @Test
     public void testJmsPrefetchPolicy() {
-        JmsPrefetchPolicy policy = new JmsPrefetchPolicy();
+        JmsDefaultPrefetchPolicy policy = new JmsDefaultPrefetchPolicy();
 
-        assertEquals(JmsPrefetchPolicy.DEFAULT_TOPIC_PREFETCH, policy.getTopicPrefetch());
-        assertEquals(JmsPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH, policy.getDurableTopicPrefetch());
-        assertEquals(JmsPrefetchPolicy.DEFAULT_QUEUE_PREFETCH, policy.getQueuePrefetch());
-        assertEquals(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH, policy.getQueueBrowserPrefetch());
-        assertEquals(JmsPrefetchPolicy.MAX_PREFETCH_SIZE, policy.getMaxPrefetchSize());
+        assertEquals(JmsDefaultPrefetchPolicy.DEFAULT_TOPIC_PREFETCH, policy.getTopicPrefetch());
+        assertEquals(JmsDefaultPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH, policy.getDurableTopicPrefetch());
+        assertEquals(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH, policy.getQueuePrefetch());
+        assertEquals(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH, policy.getQueueBrowserPrefetch());
+        assertEquals(JmsDefaultPrefetchPolicy.MAX_PREFETCH_SIZE, policy.getMaxPrefetchSize());
     }
 
     @Test
     public void testJmsPrefetchPolicyJmsPrefetchPolicy() {
-        JmsPrefetchPolicy policy1 = new JmsPrefetchPolicy();
+        JmsDefaultPrefetchPolicy policy1 = new JmsDefaultPrefetchPolicy();
         policy1.setTopicPrefetch(10);
         policy1.setDurableTopicPrefetch(20);
         policy1.setQueueBrowserPrefetch(30);
         policy1.setQueuePrefetch(40);
         policy1.setMaxPrefetchSize(100);
 
-        JmsPrefetchPolicy policy2 = new JmsPrefetchPolicy(policy1);
+        JmsDefaultPrefetchPolicy policy2 = new JmsDefaultPrefetchPolicy(policy1);
 
         assertEquals(policy1.getTopicPrefetch(), policy2.getTopicPrefetch());
         assertEquals(policy1.getDurableTopicPrefetch(), policy2.getDurableTopicPrefetch());
@@ -68,16 +70,16 @@ public class JmsPrefetchPolicyTest {
 
     @Test
     public void testGetMaxPrefetchSize() {
-        JmsPrefetchPolicy policy = new JmsPrefetchPolicy();
-        assertEquals(JmsPrefetchPolicy.MAX_PREFETCH_SIZE, policy.getMaxPrefetchSize());
+        JmsDefaultPrefetchPolicy policy = new JmsDefaultPrefetchPolicy();
+        assertEquals(JmsDefaultPrefetchPolicy.MAX_PREFETCH_SIZE, policy.getMaxPrefetchSize());
         policy.setMaxPrefetchSize(10);
         assertEquals(10, policy.getMaxPrefetchSize());
     }
 
     @Test
     public void testMaxPrefetchSizeIsHonored() {
-        JmsPrefetchPolicy policy = new JmsPrefetchPolicy();
-        assertEquals(JmsPrefetchPolicy.MAX_PREFETCH_SIZE, policy.getMaxPrefetchSize());
+        JmsDefaultPrefetchPolicy policy = new JmsDefaultPrefetchPolicy();
+        assertEquals(JmsDefaultPrefetchPolicy.MAX_PREFETCH_SIZE, policy.getMaxPrefetchSize());
         policy.setMaxPrefetchSize(42);
         assertEquals(42, policy.getMaxPrefetchSize());
 
@@ -94,12 +96,12 @@ public class JmsPrefetchPolicyTest {
 
     @Test
     public void testSetAll() {
-        JmsPrefetchPolicy policy = new JmsPrefetchPolicy();
+        JmsDefaultPrefetchPolicy policy = new JmsDefaultPrefetchPolicy();
 
-        assertEquals(JmsPrefetchPolicy.DEFAULT_TOPIC_PREFETCH, policy.getTopicPrefetch());
-        assertEquals(JmsPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH, policy.getDurableTopicPrefetch());
-        assertEquals(JmsPrefetchPolicy.DEFAULT_QUEUE_PREFETCH, policy.getQueuePrefetch());
-        assertEquals(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH, policy.getQueueBrowserPrefetch());
+        assertEquals(JmsDefaultPrefetchPolicy.DEFAULT_TOPIC_PREFETCH, policy.getTopicPrefetch());
+        assertEquals(JmsDefaultPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH, policy.getDurableTopicPrefetch());
+        assertEquals(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH, policy.getQueuePrefetch());
+        assertEquals(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH, policy.getQueueBrowserPrefetch());
 
         policy.setAll(42);
 
@@ -111,19 +113,19 @@ public class JmsPrefetchPolicyTest {
 
     @Test
     public void testEqualsObject() {
-        JmsPrefetchPolicy policy1 = new JmsPrefetchPolicy();
-        JmsPrefetchPolicy policy2 = new JmsPrefetchPolicy();
+        JmsPrefetchPolicy policy1 = new JmsDefaultPrefetchPolicy();
+        JmsPrefetchPolicy policy2 = new JmsDefaultPrefetchPolicy();
 
         assertEquals(policy1, policy1);
         assertEquals(policy1, policy2);
 
-        JmsPrefetchPolicy policy3 = new JmsPrefetchPolicy();
+        JmsDefaultPrefetchPolicy policy3 = new JmsDefaultPrefetchPolicy();
         policy3.setTopicPrefetch(10);
-        JmsPrefetchPolicy policy4 = new JmsPrefetchPolicy();
+        JmsDefaultPrefetchPolicy policy4 = new JmsDefaultPrefetchPolicy();
         policy4.setQueuePrefetch(10);
-        JmsPrefetchPolicy policy5 = new JmsPrefetchPolicy();
+        JmsDefaultPrefetchPolicy policy5 = new JmsDefaultPrefetchPolicy();
         policy5.setDurableTopicPrefetch(10);
-        JmsPrefetchPolicy policy6 = new JmsPrefetchPolicy();
+        JmsDefaultPrefetchPolicy policy6 = new JmsDefaultPrefetchPolicy();
         policy6.setQueueBrowserPrefetch(10);
 
         assertFalse(policy1.equals(policy3));

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5f00d23/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index a776a76..4f59bc8 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -45,8 +45,8 @@ import javax.jms.Topic;
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.JmsDefaultConnectionListener;
 import org.apache.qpid.jms.JmsOperationTimedOutException;
-import org.apache.qpid.jms.JmsPrefetchPolicy;
 import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.Wait;
 import org.apache.qpid.jms.test.testpeer.AmqpPeerRunnable;
@@ -595,8 +595,7 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
     @Test(timeout=20000)
     public void testCannotUseMessageListener() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            Connection connection = testFixture.establishConnecton(testPeer);
-            ((JmsConnection)connection).getPrefetchPolicy().setAll(0);
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0");
             connection.start();
 
             testPeer.expectBegin();
@@ -724,12 +723,12 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
 
             // Expect receiver link attach and send credit
             testPeer.expectReceiverAttach();
-            testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
+            testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
             if (!localCheckOnly) {
                 // If not doing local-check only, expect the credit to be drained
                 // and then replenished to open the window again
-                testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
-                testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
+                testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
+                testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
             }
 
             MessageConsumer consumer = session.createConsumer(queue);
@@ -778,10 +777,10 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
 
             // Expect receiver link attach and send credit
             testPeer.expectReceiverAttach();
-            testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
+            testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
 
             // Expect drain but do not respond so that the consumer times out.
-            testPeer.expectLinkFlow(true, false, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
+            testPeer.expectLinkFlow(true, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
 
             // Consumer should close due to timed waiting for drain.
             testPeer.expectDetach(true, true, true);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5f00d23/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageExpirationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageExpirationIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageExpirationIntegrationTest.java
index 52ad5ff..1c3820b 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageExpirationIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageExpirationIntegrationTest.java
@@ -38,7 +38,7 @@ import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
-import org.apache.qpid.jms.JmsPrefetchPolicy;
+import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
@@ -94,9 +94,9 @@ public class MessageExpirationIntegrationTest extends QpidJmsTestCase {
             assertEquals("Unexpected message content", liveMsgContent, ((TextMessage)m).getText());
 
             // Verify the other message is not there. Will drain to be sure there are no messages.
-            testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_PREFETCH - 2)));
+            testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH - 2)));
             // Then reopen the credit window afterwards
-            testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
+            testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
 
             m = consumer.receive(10);
             assertNull("Message should not have been received", m);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org