You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/04/30 00:19:06 UTC

qpid-jms git commit: https://issues.apache.org/jira/browse/QPIDJMS-131

Repository: qpid-jms
Updated Branches:
  refs/heads/master 4915a8fdf -> bac662d54


https://issues.apache.org/jira/browse/QPIDJMS-131

Initial pass of addition of JmsPresettlePolicy to allow finer control
over presettlement in the JMS client.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/bac662d5
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/bac662d5
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/bac662d5

Branch: refs/heads/master
Commit: bac662d540ab51e418ef2b328c56507d2f9f13cb
Parents: 4915a8f
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Apr 29 18:18:48 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Apr 29 18:18:48 2016 -0400

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsConnection.java |  13 +-
 .../apache/qpid/jms/JmsConnectionFactory.java   |  60 +-
 .../org/apache/qpid/jms/JmsMessageProducer.java |  17 +-
 .../org/apache/qpid/jms/JmsPresettlePolicy.java | 169 ++++++
 .../apache/qpid/jms/JmsRedeliveryPolicy.java    |  30 +
 .../java/org/apache/qpid/jms/JmsSession.java    |  14 +-
 .../apache/qpid/jms/meta/JmsConnectionInfo.java |  24 +
 .../apache/qpid/jms/meta/JmsProducerInfo.java   |  19 +
 .../amqp/AmqpAnonymousFallbackProducer.java     |   6 +-
 .../qpid/jms/provider/amqp/AmqpConnection.java  |   1 +
 .../jms/provider/amqp/AmqpFixedProducer.java    |   9 +-
 .../amqp/builders/AmqpProducerBuilder.java      |  10 +-
 .../PresettledProducerIntegrationTest.java      | 563 +++++++++++++++++++
 .../integration/ProducerIntegrationTest.java    |  32 --
 14 files changed, 883 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index fd2d5be..4374954 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -101,7 +101,6 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
     private ExceptionListener exceptionListener;
     private JmsMessageFactory messageFactory;
     private Provider provider;
-    private JmsMessageIDBuilder messageIDBuilder;
 
     private final Set<JmsConnectionListener> connectionListeners =
         new CopyOnWriteArraySet<JmsConnectionListener>();
@@ -863,6 +862,14 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
         connectionInfo.setRedeliveryPolicy(redeliveryPolicy);
     }
 
+    public JmsPresettlePolicy getPresettlePolicy() {
+        return connectionInfo.getPresettlePolicy();
+    }
+
+    public void setPresettlePolicy(JmsPresettlePolicy presettlePolicy) {
+        connectionInfo.setPresettlePolicy(presettlePolicy);
+    }
+
     public boolean isReceiveLocalOnly() {
         return connectionInfo.isReceiveLocalOnly();
     }
@@ -1003,11 +1010,11 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
     }
 
     public JmsMessageIDBuilder getMessageIDBuilder() {
-        return messageIDBuilder;
+        return connectionInfo.getMessageIDBuilder();
     }
 
     void setMessageIDBuilder(JmsMessageIDBuilder messageIDBuilder) {
-        this.messageIDBuilder = messageIDBuilder;
+        connectionInfo.setMessageIDBuilder(messageIDBuilder);
     }
 
     public boolean isPopulateJMSXUserID() {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
index 087f18d..9fb708d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
@@ -86,6 +86,7 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact
 
     private JmsPrefetchPolicy prefetchPolicy = new JmsPrefetchPolicy();
     private JmsRedeliveryPolicy redeliveryPolicy = new JmsRedeliveryPolicy();
+    private JmsPresettlePolicy presettlePolicy = new JmsPresettlePolicy();
     private JmsMessageIDBuilder messageIDBuilder = JmsMessageIDBuilder.BUILTIN.DEFAULT.createBuilder();
 
     public JmsConnectionFactory() {
@@ -290,39 +291,31 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact
         try {
             if (this.remoteURI.getQuery() != null) {
                 Map<String, String> map = PropertyUtil.parseQuery(this.remoteURI.getQuery());
-                Map<String, String> jmsOptionsMap = PropertyUtil.filterProperties(map, "jms.");
-
-                Map<String, String> unused = PropertyUtil.setProperties(this, jmsOptionsMap);
-                if (!unused.isEmpty()) {
-                    String msg = ""
-                        + " Not all jms options could be set on the ConnectionFactory."
-                        + " Check the options are spelled correctly."
-                        + " Unused parameters=[" + unused + "]."
-                        + " This connection factory cannot be started.";
-                    throw new IllegalArgumentException(msg);
-                } else {
-                    this.remoteURI = PropertyUtil.replaceQuery(this.remoteURI, map);
-                }
+                applyURIOptions(map);
+                this.remoteURI = PropertyUtil.replaceQuery(this.remoteURI, map);
             } else if (URISupport.isCompositeURI(this.remoteURI)) {
                 CompositeData data = URISupport.parseComposite(this.remoteURI);
-                Map<String, String> jmsOptionsMap = PropertyUtil.filterProperties(data.getParameters(), "jms.");
-                Map<String, String> unused = PropertyUtil.setProperties(this, jmsOptionsMap);
-                if (!unused.isEmpty()) {
-                    String msg = ""
-                        + " Not all jms options could be set on the ConnectionFactory."
-                        + " Check the options are spelled correctly."
-                        + " Unused parameters=[" + unused + "]."
-                        + " This connection factory cannot be started.";
-                    throw new IllegalArgumentException(msg);
-                } else {
-                    this.remoteURI = data.toURI();
-                }
+                applyURIOptions(data.getParameters());
+                this.remoteURI = data.toURI();
             }
         } catch (Exception e) {
             throw new IllegalArgumentException(e.getMessage());
         }
     }
 
+    private void applyURIOptions(Map<String, String> options) throws IllegalArgumentException {
+        Map<String, String> jmsOptionsMap = PropertyUtil.filterProperties(options, "jms.");
+        Map<String, String> unused = PropertyUtil.setProperties(this, jmsOptionsMap);
+        if (!unused.isEmpty()) {
+            String msg = ""
+                + " Not all jms options could be set on the ConnectionFactory."
+                + " Check the options are spelled correctly."
+                + " Unused parameters=[" + unused + "]."
+                + " This connection factory cannot be started.";
+            throw new IllegalArgumentException(msg);
+        }
+    }
+
     /**
      * @return the user name used for connection authentication.
      */
@@ -524,6 +517,23 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact
     }
 
     /**
+     * @return the presettlePolicy that is currently configured.
+     */
+    public JmsPresettlePolicy getPresettlePolicy() {
+        return presettlePolicy;
+    }
+
+    /**
+     * Sets the JmsPresettlePolicy that is applied to MessageProducers.
+     *
+     * @param presettlePolicy
+     *      the presettlePolicy to use by connections created from this factory.
+     */
+    public void setPresettlePolicy(JmsPresettlePolicy presettlePolicy) {
+        this.presettlePolicy = presettlePolicy;
+    }
+
+    /**
      * @return the currently configured client ID prefix for auto-generated client IDs.
      */
     public synchronized String getClientIDPrefix() {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
index 600f26e..84e4017 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
@@ -40,7 +40,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
     protected final JmsSession session;
     protected final JmsConnection connection;
     protected JmsProducerInfo producerInfo;
-    protected final boolean flexibleDestination;
+    protected final boolean anonymousProducer;
     protected int deliveryMode = DeliveryMode.PERSISTENT;
     protected int priority = Message.DEFAULT_PRIORITY;
     protected long timeToLive = Message.DEFAULT_TIME_TO_LIVE;
@@ -53,9 +53,10 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
     protected JmsMessageProducer(JmsProducerId producerId, JmsSession session, JmsDestination destination) throws JMSException {
         this.session = session;
         this.connection = session.getConnection();
-        this.flexibleDestination = destination == null;
+        this.anonymousProducer = destination == null;
         this.producerInfo = new JmsProducerInfo(producerId);
         this.producerInfo.setDestination(destination);
+        this.producerInfo.setPresettle(session.getPresettlePolicy().isSendPresttled(destination, session));
 
         session.getConnection().createResource(producerInfo);
     }
@@ -141,7 +142,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
     public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
         checkClosed();
 
-        if (flexibleDestination) {
+        if (anonymousProducer) {
             throw new UnsupportedOperationException("Using this method is not supported on producers created without an explicit Destination");
         }
 
@@ -157,7 +158,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
     public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
         checkClosed();
 
-        if (!flexibleDestination) {
+        if (!anonymousProducer) {
             throw new UnsupportedOperationException("Using this method is not supported on producers created with an explicit Destination.");
         }
 
@@ -231,6 +232,14 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
         }
     }
 
+    protected boolean isPresettled() {
+        return producerInfo.isPresettle();
+    }
+
+    protected boolean isAnonymous() {
+        return anonymousProducer;
+    }
+
     ////////////////////////////////////////////////////////////////////////////
     // Connection interruption handlers.
     ////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPresettlePolicy.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPresettlePolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPresettlePolicy.java
new file mode 100644
index 0000000..c6079d6
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPresettlePolicy.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+/**
+ * Policy object that allows for configuration of options that affect when
+ * a JMS MessageProducer will result in AMQP presettled message sends.
+ */
+public class JmsPresettlePolicy {
+
+    private boolean presettleAll;
+    private boolean presettleProducers;
+    private boolean presettleTopicProducers;
+    private boolean presettleQueueProducers;
+    private boolean presettleTransactedProducers;
+
+    public JmsPresettlePolicy() {
+    }
+
+    public JmsPresettlePolicy(JmsPresettlePolicy source) {
+        this.presettleAll = source.presettleAll;
+        this.presettleProducers = source.presettleProducers;
+        this.presettleTopicProducers = source.presettleTopicProducers;
+        this.presettleQueueProducers = source.presettleQueueProducers;
+        this.presettleTransactedProducers = source.presettleTransactedProducers;
+    }
+
+    public JmsPresettlePolicy copy() {
+        return new JmsPresettlePolicy(this);
+    }
+
+    /**
+     * @return the presettleAll setting for this policy
+     */
+    public boolean isPresettleAll() {
+        return presettleAll;
+    }
+
+    /**
+     * Sets the presettle all sends option.  When true all MessageProducers
+     * will send their messages presettled.
+     *
+     * @param presettleAll
+     *      the presettleAll value to apply.
+     */
+    public void setPresettleAll(boolean presettleAll) {
+        this.presettleAll = presettleAll;
+    }
+
+    /**
+     * @return the presettleProducers setting for this policy.
+     */
+    public boolean isPresettleProducers() {
+        return presettleProducers;
+    }
+
+    /**
+     * Sets the the presettle all sends option.  When true all MessageProducers that
+     * are created will send their messages as settled.
+     *
+     * @param presettleProducers
+     *      the presettleProducers value to apply.
+     */
+    public void setPresettleProducers(boolean presettleProducers) {
+        this.presettleProducers = presettleProducers;
+    }
+
+    /**
+     * @return the presettleTopicProducers setting for this policy
+     */
+    public boolean isPresettleTopicProducers() {
+        return presettleTopicProducers;
+    }
+
+    /**
+     * Sets the presettle Topic sends option.  When true any MessageProducer that
+     * is created that sends to a Topic will send its messages presettled, and any
+     * anonymous MessageProducer will send Messages that are sent to a Topic as
+     * presettled as well.
+     *
+     * @param presettleTopicProducers
+     *      the presettleTopicProducers value to apply.
+     */
+    public void setPresettleTopicProducers(boolean presettleTopicProducers) {
+        this.presettleTopicProducers = presettleTopicProducers;
+    }
+
+    /**
+     * @return the presettleQueueSends setting for this policy
+     */
+    public boolean isPresettleQueueProducers() {
+        return presettleQueueProducers;
+    }
+
+    /**
+     * Sets the presettle Queue sends option.  When true any MessageProducer that
+     * is created that sends to a Queue will send its messages presettled, and any
+     * anonymous MessageProducer will send Messages that are sent to a Queue as
+     * presettled as well.
+     *
+     * @param presettleQueueProducers
+     *      the presettleQueueSends value to apply.
+     */
+    public void setPresettleQueueProducers(boolean presettleQueueProducers) {
+        this.presettleQueueProducers = presettleQueueProducers;
+    }
+
+    /**
+     * @return the presettleTransactedSends setting for this policy
+     */
+    public boolean isPresettleTransactedProducers() {
+        return presettleTransactedProducers;
+    }
+
+    /**
+     * Sets the presettle in transactions option.  When true any MessageProducer that is
+     * operating inside of a transacted session will send its messages presettled.
+     *
+     * @param presettleTransactedProducers the presettleTransactedSends to set
+     */
+    public void setPresettleTransactedProducers(boolean presettleTransactedProducers) {
+        this.presettleTransactedProducers = presettleTransactedProducers;
+    }
+
+    /**
+     * Determines when a producer will send message presettled.
+     * <p>
+     * Called when the a producer is being created to determine whether the producer will
+     * be configured to send all its message as presettled or not.
+     * <p>
+     * For an anonymous producer this method is called on each send to allow the policy to
+     * be applied to the target destination that the message will be sent to.
+     *
+     * @param destination
+     *      the destination that the producer will be sending to.
+     * @param session
+     *      the session that owns the producer that will send be sending a message.
+     *
+     * @return true if the producer should send presettled.
+     */
+    public boolean isSendPresttled(JmsDestination destination, JmsSession session) {
+
+        if (presettleAll || presettleProducers) {
+            return true;
+        } else if (session.isTransacted() && presettleTransactedProducers) {
+            return true;
+        } else if (destination != null && destination.isQueue() && presettleQueueProducers) {
+            return true;
+        } else if (destination != null && destination.isTopic() && presettleTopicProducers) {
+            return true;
+        }
+
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsRedeliveryPolicy.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsRedeliveryPolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsRedeliveryPolicy.java
index 395b982..c57e238 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsRedeliveryPolicy.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsRedeliveryPolicy.java
@@ -59,4 +59,34 @@ public class JmsRedeliveryPolicy {
     public void setMaxRedeliveries(int maxRedeliveries) {
         this.maxRedeliveries = maxRedeliveries;
     }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + maxRedeliveries;
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+
+        if (obj == null) {
+            return false;
+        }
+
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+
+        JmsRedeliveryPolicy other = (JmsRedeliveryPolicy) obj;
+        if (maxRedeliveries != other.maxRedeliveries) {
+            return false;
+        }
+
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index c00cdbd..a3e2b27 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -93,7 +93,8 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
     private final AtomicBoolean started = new AtomicBoolean();
     private final LinkedBlockingQueue<JmsInboundMessageDispatch> stoppedMessages =
         new LinkedBlockingQueue<JmsInboundMessageDispatch>(10000);
-    private JmsPrefetchPolicy prefetchPolicy;
+    private final JmsPrefetchPolicy prefetchPolicy;
+    private final JmsPresettlePolicy presettlePolicy;
     private final JmsMessageIDBuilder messageIDBuilder;
     private final JmsSessionInfo sessionInfo;
     private volatile ExecutorService executor;
@@ -108,7 +109,8 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
     protected JmsSession(JmsConnection connection, JmsSessionId sessionId, int acknowledgementMode) throws JMSException {
         this.connection = connection;
         this.acknowledgementMode = acknowledgementMode;
-        this.prefetchPolicy = new JmsPrefetchPolicy(connection.getPrefetchPolicy());
+        this.prefetchPolicy = connection.getPrefetchPolicy().copy();
+        this.presettlePolicy = connection.getPresettlePolicy().copy();
         this.messageIDBuilder = connection.getMessageIDBuilder();
 
         if (acknowledgementMode == SESSION_TRANSACTED) {
@@ -696,6 +698,10 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
             envelope.setSendAsync(!sync);
             envelope.setDispatchId(messageSequence);
 
+            if (producer.isAnonymous()) {
+                envelope.setPresettle(presettlePolicy.isSendPresttled(destination, this));
+            }
+
             transactionContext.send(connection, envelope);
         } finally {
             sendLock.unlock();
@@ -910,8 +916,8 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         return prefetchPolicy;
     }
 
-    public void setPrefetchPolicy(JmsPrefetchPolicy prefetchPolicy) {
-        this.prefetchPolicy = prefetchPolicy;
+    public JmsPresettlePolicy getPresettlePolicy() {
+        return presettlePolicy;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
index d97d33d..3af306d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
@@ -20,7 +20,9 @@ import java.net.URI;
 import java.nio.charset.Charset;
 
 import org.apache.qpid.jms.JmsPrefetchPolicy;
+import org.apache.qpid.jms.JmsPresettlePolicy;
 import org.apache.qpid.jms.JmsRedeliveryPolicy;
+import org.apache.qpid.jms.message.JmsMessageIDBuilder;
 
 /**
  * Meta object that contains the JmsConnection identification and configuration
@@ -59,6 +61,8 @@ public final class JmsConnectionInfo implements JmsResource, Comparable<JmsConne
 
     private JmsPrefetchPolicy prefetchPolicy = new JmsPrefetchPolicy();
     private JmsRedeliveryPolicy redeliveryPolicy = new JmsRedeliveryPolicy();
+    private JmsPresettlePolicy presettlePolicy = new JmsPresettlePolicy();
+    private JmsMessageIDBuilder messageIDBuilder = JmsMessageIDBuilder.BUILTIN.DEFAULT.createBuilder();
 
     private volatile byte[] encodedUserId;
 
@@ -89,6 +93,10 @@ public final class JmsConnectionInfo implements JmsResource, Comparable<JmsConne
         copy.topicPrefix = topicPrefix;
         copy.connectTimeout = connectTimeout;
         copy.validatePropertyNames = validatePropertyNames;
+        copy.messageIDBuilder = messageIDBuilder;
+        copy.prefetchPolicy = prefetchPolicy.copy();
+        copy.redeliveryPolicy = redeliveryPolicy.copy();
+        copy.presettlePolicy = presettlePolicy.copy();
     }
 
     public boolean isForceAsyncSend() {
@@ -264,6 +272,22 @@ public final class JmsConnectionInfo implements JmsResource, Comparable<JmsConne
         this.redeliveryPolicy = redeliveryPolicy.copy();
     }
 
+    public JmsPresettlePolicy getPresettlePolicy() {
+        return presettlePolicy;
+    }
+
+    public void setPresettlePolicy(JmsPresettlePolicy presettlePolicy) {
+        this.presettlePolicy = presettlePolicy;
+    }
+
+    public JmsMessageIDBuilder getMessageIDBuilder() {
+        return messageIDBuilder;
+    }
+
+    public void setMessageIDBuilder(JmsMessageIDBuilder messageIDBuilder) {
+        this.messageIDBuilder = messageIDBuilder;
+    }
+
     public boolean isPopulateJMSXUserID() {
         return populateJMSXUserID;
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsProducerInfo.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsProducerInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsProducerInfo.java
index 53939a9..8b1e019 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsProducerInfo.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsProducerInfo.java
@@ -22,6 +22,7 @@ public final class JmsProducerInfo implements JmsResource, Comparable<JmsProduce
 
     private final JmsProducerId producerId;
     private JmsDestination destination;
+    private boolean presettle;
 
     public JmsProducerInfo(JmsProducerId producerId) {
         if (producerId == null) {
@@ -66,6 +67,24 @@ public final class JmsProducerInfo implements JmsResource, Comparable<JmsProduce
         this.destination = destination;
     }
 
+    /**
+     * @return the presettle mode of this producer.
+     */
+    public boolean isPresettle() {
+        return presettle;
+    }
+
+    /**
+     * Sets the presettle mode of the producer, when true the producer will be created
+     * as a presettled producer and all messages it sends will be settled before dispatch.
+     *
+     * @param presettle
+     *      the presettle option to set on this producer.
+     */
+    public void setPresettle(boolean presettle) {
+        this.presettle = presettle;
+    }
+
     @Override
     public String toString() {
         return "JmsProducerInfo { " + getId() + ", destination = " + getDestination() + " }";

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
index a115f72..71f26e7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
@@ -45,7 +45,7 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
     private static final Logger LOG = LoggerFactory.getLogger(AmqpAnonymousFallbackProducer.class);
     private static final IdGenerator producerIdGenerator = new IdGenerator();
 
-    private final AnonymousProducerCache producerCache = new AnonymousProducerCache(10);
+    private final AnonymousProducerCache producerCache;
     private final String producerIdKey = producerIdGenerator.generateId();
     private long producerIdCount;
 
@@ -61,7 +61,10 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
         super(session, info);
 
         if (connection.isAnonymousProducerCache()) {
+            producerCache = new AnonymousProducerCache(10);
             producerCache.setMaxCacheSize(connection.getAnonymousProducerCacheSize());
+        } else {
+            producerCache = null;
         }
     }
 
@@ -79,6 +82,7 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
             // send to the given AMQP target.
             JmsProducerInfo info = new JmsProducerInfo(getNextProducerId());
             info.setDestination(envelope.getDestination());
+            info.setPresettle(this.getResourceInfo().isPresettle());
 
             // We open a Fixed Producer instance with the target destination.  Once it opens
             // it will trigger the open event which will in turn trigger the send event.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
index a7e818c..f284c8a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
@@ -119,6 +119,7 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn
         return remoteURI;
     }
 
+    @Override
     public AmqpProvider getProvider() {
         return provider;
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index ef526c9..48e9d72 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -43,6 +43,7 @@ import org.apache.qpid.proton.amqp.messaging.Released;
 import org.apache.qpid.proton.amqp.transaction.TransactionalState;
 import org.apache.qpid.proton.amqp.transport.DeliveryState;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Sender;
 import org.apache.qpid.proton.message.Message;
@@ -63,7 +64,6 @@ public class AmqpFixedProducer extends AmqpProducer {
     private final Set<Delivery> sent = new LinkedHashSet<Delivery>();
     private final LinkedList<InFlightSend> blocked = new LinkedList<InFlightSend>();
     private byte[] encodeBuffer = new byte[1024 * 8];
-    private boolean presettle = false;
 
     public AmqpFixedProducer(AmqpSession session, JmsProducerInfo info) {
         super(session, info);
@@ -287,13 +287,8 @@ public class AmqpFixedProducer extends AmqpProducer {
     }
 
     @Override
-    public void setPresettle(boolean presettle) {
-        this.presettle = presettle;
-    }
-
-    @Override
     public boolean isPresettle() {
-        return presettle;
+        return getEndpoint().getSenderSettleMode() == SenderSettleMode.SETTLED;
     }
 
     public long getSendTimeout() {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpProducerBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpProducerBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpProducerBuilder.java
index 74fe18d..0e0f3f4 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpProducerBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpProducerBuilder.java
@@ -66,7 +66,7 @@ public class AmqpProducerBuilder extends AmqpResourceBuilder<AmqpProducer, AmqpS
         Sender sender = getParent().getEndpoint().sender(senderName);
         sender.setSource(source);
         sender.setTarget(target);
-        if (getParent().getConnection().isPresettleProducers()) {
+        if (resourceInfo.isPresettle() || getParent().getConnection().isPresettleProducers()) {
             sender.setSenderSettleMode(SenderSettleMode.SETTLED);
         } else {
             sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
@@ -78,13 +78,7 @@ public class AmqpProducerBuilder extends AmqpResourceBuilder<AmqpProducer, AmqpS
 
     @Override
     protected AmqpProducer createResource(AmqpSession parent, JmsProducerInfo resourceInfo, Sender endpoint) {
-        AmqpProducer producer = new AmqpFixedProducer(getParent(), getResourceInfo(), endpoint);
-
-        if (getParent().getConnection().isPresettleProducers()) {
-            producer.setPresettle(true);
-        }
-
-        return producer;
+        return new AmqpFixedProducer(getParent(), getResourceInfo(), endpoint);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledProducerIntegrationTest.java
new file mode 100644
index 0000000..e7b70ef
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledProducerIntegrationTest.java
@@ -0,0 +1,563 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.integration;
+
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.ANONYMOUS_RELAY;
+import static org.hamcrest.Matchers.arrayContaining;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.testpeer.ListDescribedType;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted;
+import org.apache.qpid.jms.test.testpeer.describedtypes.TransactionalState;
+import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.TransactionalStateMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.transaction.TxnCapability;
+import org.hamcrest.Matcher;
+import org.junit.Test;
+
+/**
+ * Test MessageProducers created using various configuration of the presettle options
+ */
+public class PresettledProducerIntegrationTest extends QpidJmsTestCase {
+
+    private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
+
+    private final Symbol[] serverCapabilities = new Symbol[] { ANONYMOUS_RELAY };
+
+    //----- Test the jms.presettleAll option ---------------------------------//
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleAllSendToTopic() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleAll=true");
+            doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, false);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleAllSendToQueue() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleAll=true");
+            doTestProducerWithPresettleOptions(testPeer, connection, true, true, false, false);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleAllSendToTempTopic() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleAll=true");
+            doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, true);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleAllSendToTempQueue() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleAll=true");
+            doTestProducerWithPresettleOptions(testPeer, connection, true, true, false, true);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleAllAnonymousSendToTopic() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleAll=true", serverCapabilities, null);
+            doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, true, false);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleAllAnonymousSendToQueue() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleAll=true", serverCapabilities, null);
+            doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, false, false);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleAllAnonymousSendToTempTopic() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleAll=true", serverCapabilities, null);
+            doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, true, true);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleAllAnonymousSendToTempQueue() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleAll=true", serverCapabilities, null);
+            doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, false, true);
+        }
+    }
+
+    //----- Test the amqp.presettleProducers option --------------------------//
+
+    @Test(timeout = 20000)
+    public void testPresettledProducersConfigurationAppliedToTopic() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?amqp.presettleProducers=true");
+            doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, false);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testPresettledProducersConfigurationAppliedToQueue() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?amqp.presettleProducers=true");
+            doTestProducerWithPresettleOptions(testPeer, connection, true, true, false, false);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testPresettledProducersConfigurationAppliedToTempTopic() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?amqp.presettleProducers=true");
+            doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, true);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testPresettledProducersConfigurationAppliedToTempQueue() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?amqp.presettleProducers=true");
+            doTestProducerWithPresettleOptions(testPeer, connection, true, true, false, true);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testPresettledProducersConfigurationAppliedAnonymousSendToTopic() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?amqp.presettleProducers=true", serverCapabilities, null);
+            doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, true, false);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testPresettledProducersConfigurationAppliedAnonymousSendToQueue() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?amqp.presettleProducers=true", serverCapabilities, null);
+            doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, false, false);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testPresettledProducersConfigurationAppliedAnonymousSendToTempTopic() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?amqp.presettleProducers=true", serverCapabilities, null);
+            doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, true, true);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testPresettledProducersConfigurationAppliedAnonymousSendToTempQueue() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?amqp.presettleProducers=true", serverCapabilities, null);
+            doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, false, true);
+        }
+    }
+
+    //----- Test the jms.presettleProducers option ---------------------------------//
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleProducersTopic() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleProducers=true");
+            doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, false);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleProducersQueue() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleProducers=true");
+            doTestProducerWithPresettleOptions(testPeer, connection, true, true, false, false);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleProducersTempTopic() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleProducers=true");
+            doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, true);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleProducersTempQueue() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleProducers=true");
+            doTestProducerWithPresettleOptions(testPeer, connection, true, true, false, true);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleProducersAnonymousTopic() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleProducers=true", serverCapabilities, null);
+            doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, true, false);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleProducersAnonymousQueue() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleProducers=true", serverCapabilities, null);
+            doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, false, false);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleProducersAnonymousTempTopic() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleProducers=true", serverCapabilities, null);
+            doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, true, true);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleProducersAnonymousTempQueue() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleProducers=true", serverCapabilities, null);
+            doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, false, true);
+        }
+    }
+
+    //----- Test the jms.presettleTopicProducers option ---------------------------------//
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleTopicProducersTopic() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTopicProducers=true");
+            doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, false);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleTopicProducersQueue() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTopicProducers=true");
+            doTestProducerWithPresettleOptions(testPeer, connection, false, false, false, false);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleTopicProducersTempTopic() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTopicProducers=true");
+            doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, true);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleTopicProducersTempQueue() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTopicProducers=true");
+            doTestProducerWithPresettleOptions(testPeer, connection, false, false, false, true);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleTopicProducersAnonymousTopic() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTopicProducers=true", serverCapabilities, null);
+            doTestProducerWithPresettleOptions(testPeer, connection, false, true, false, true, true, false);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleTopicProducersAnonymousQueue() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTopicProducers=true", serverCapabilities, null);
+            doTestProducerWithPresettleOptions(testPeer, connection, false, true, false, false, false, false);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleTopicProducersAnonymousTempTopic() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTopicProducers=true", serverCapabilities, null);
+            doTestProducerWithPresettleOptions(testPeer, connection, false, true, false, true, true, true);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleTopicProducersAnonymousTempQueue() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTopicProducers=true", serverCapabilities, null);
+            doTestProducerWithPresettleOptions(testPeer, connection, false, true, false, false, false, true);
+        }
+    }
+
+    //----- Test the jms.presettleQueueProducers option ---------------------------------//
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleQueueProducersTopic() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleQueueProducers=true");
+            doTestProducerWithPresettleOptions(testPeer, connection, false, false, true, false);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleQueueProducersQueue() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleQueueProducers=true");
+            doTestProducerWithPresettleOptions(testPeer, connection, true, true, false, false);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleQueueProducersTempTopic() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleQueueProducers=true");
+            doTestProducerWithPresettleOptions(testPeer, connection, false, false, true, true);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleQueueProducersTempQueue() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleQueueProducers=true");
+            doTestProducerWithPresettleOptions(testPeer, connection, true, true, false, true);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleQueueProducersAnonymousTopic() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleQueueProducers=true", serverCapabilities, null);
+            doTestProducerWithPresettleOptions(testPeer, connection, false, true, false, false, true, false);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleQueueProducersAnonymousQueue() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleQueueProducers=true", serverCapabilities, null);
+            doTestProducerWithPresettleOptions(testPeer, connection, false, true, false, true, false, false);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleQueueProducersAnonymousTempTopic() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleQueueProducers=true", serverCapabilities, null);
+            doTestProducerWithPresettleOptions(testPeer, connection, false, true, false, false, true, true);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleQueueProducersAnonymousTempQueue() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleQueueProducers=true", serverCapabilities, null);
+            doTestProducerWithPresettleOptions(testPeer, connection, false, true, false, true, false, true);
+        }
+    }
+
+    //----- Test the jms.presettleTransactedProducers option ---------------------------------//
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleTransactedProducersTopic() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTransactedProducers=true");
+            doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, true, false);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleTransactedProducersQueue() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTransactedProducers=true");
+            doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, false, false);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleTransactedProducersTempTopic() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTransactedProducers=true");
+            doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, true, true);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleTransactedProducersTempQueue() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTransactedProducers=true");
+            doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, false, true);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleTransactedProducersTopicNoTX() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTransactedProducers=true");
+            doTestProducerWithPresettleOptions(testPeer, connection, false, false, false, true, false);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleTransactedProducersQueueNoTX() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTransactedProducers=true");
+            doTestProducerWithPresettleOptions(testPeer, connection, false, false, false, false, false);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleTransactedProducersTempTopicNoTX() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTransactedProducers=true");
+            doTestProducerWithPresettleOptions(testPeer, connection, false, false, false, true, true);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJmsPresettlePolicyPresettleTransactedProducersTempQueueNoTX() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTransactedProducers=true");
+            doTestProducerWithPresettleOptions(testPeer, connection, false, false, false, false, true);
+        }
+    }
+
+    //----- Test Method implementation ---------------------------------------//
+
+    private void doTestProducerWithPresettleOptions(TestAmqpPeer testPeer, Connection connection, boolean senderSettled, boolean transferSettled, boolean topic, boolean temporary) throws Exception {
+        doTestProducerWithPresettleOptions(testPeer, connection, false, senderSettled, transferSettled, topic, temporary);
+    }
+
+    private void doTestProducerWithPresettleOptions(TestAmqpPeer testPeer, Connection connection, boolean transacted, boolean senderSettled, boolean transferSettled, boolean topic, boolean temporary) throws Exception {
+        doTestProducerWithPresettleOptions(testPeer, connection, transacted, false, senderSettled, transferSettled, topic, temporary);
+    }
+
+    private void doTestProducerWithPresettleOptions(TestAmqpPeer testPeer, Connection connection, boolean transacted, boolean anonymous, boolean senderSettled, boolean transferSettled, boolean topic, boolean temporary) throws Exception {
+        testPeer.expectBegin();
+
+        Session session = null;
+        Binary txnId = null;
+
+        if (transacted) {
+            // Expect the session, with an immediate link to the transaction coordinator
+            // using a target with the expected capabilities only.
+            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
+            txCoordinatorMatcher.withCapabilities(arrayContaining(TxnCapability.LOCAL_TXN));
+            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+
+            // First expect an unsettled 'declare' transfer to the txn coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+            testPeer.expectDeclare(txnId);
+
+            session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        } else {
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        }
+
+        Destination destination = null;
+        if (topic) {
+            if (temporary) {
+                String dynamicAddress = "myTempTopicAddress";
+                testPeer.expectTempTopicCreationAttach(dynamicAddress);
+                destination = session.createTemporaryTopic();
+            } else {
+                destination = session.createTopic("myTopic");
+            }
+        } else {
+            if (temporary) {
+                String dynamicAddress = "myTempQueueAddress";
+                testPeer.expectTempQueueCreationAttach(dynamicAddress);
+                destination = session.createTemporaryQueue();
+            } else {
+                destination = session.createQueue("myTopic");
+            }
+            destination = session.createQueue("myQueue");
+        }
+
+        if (senderSettled) {
+            testPeer.expectSettledSenderAttach();
+        } else {
+            testPeer.expectSenderAttach();
+        }
+
+        MessageProducer producer = null;
+        if (anonymous) {
+            producer = session.createProducer(null);
+        } else {
+            producer = session.createProducer(destination);
+        }
+
+        // Create and transfer a new message
+        MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
+        headersMatcher.withDurable(equalTo(true));
+        MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+        TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+        messageMatcher.setHeadersMatcher(headersMatcher);
+        messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+
+        Matcher<?> stateMatcher = nullValue();
+        if (transacted) {
+            stateMatcher = new TransactionalStateMatcher();
+            ((TransactionalStateMatcher) stateMatcher).withTxnId(equalTo(txnId));
+            ((TransactionalStateMatcher) stateMatcher).withOutcome(nullValue());
+        }
+
+        ListDescribedType responseState = new Accepted();
+        if (transacted) {
+            TransactionalState txState = new TransactionalState();
+            txState.setTxnId(txnId);
+            txState.setOutcome(new Accepted());
+        }
+
+        if (transferSettled) {
+            testPeer.expectTransfer(messageMatcher, stateMatcher, true, false, responseState, false);
+        } else {
+            testPeer.expectTransfer(messageMatcher, stateMatcher, false, true, responseState, true);
+        }
+
+        Message message = session.createTextMessage();
+
+        if (anonymous) {
+            producer.send(destination, message);
+        } else {
+            producer.send(message);
+        }
+
+        if (transacted) {
+            testPeer.expectDischarge(txnId, true);
+        }
+
+        testPeer.expectClose();
+
+        connection.close();
+
+        testPeer.waitForAllHandlersToComplete(1000);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index 9625cd1..2e49080 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -1602,36 +1602,4 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
-
-    @Test(timeout = 20000)
-    public void testPresettledProducersConfigurationApplied() throws Exception {
-        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            Connection connection = testFixture.establishConnecton(testPeer, "?amqp.presettleProducers=true");
-            testPeer.expectBegin();
-            testPeer.expectSettledSenderAttach();
-
-            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            Queue queue = session.createQueue("myQueue");
-            MessageProducer producer = session.createProducer(queue);
-
-            // Create and transfer a new message
-            MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true)
-                    .withDurable(equalTo(true));
-            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
-            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
-            messageMatcher.setHeadersMatcher(headersMatcher);
-            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
-            testPeer.expectTransfer(messageMatcher, nullValue(), true, false, null, false);
-            testPeer.expectClose();
-
-            Message message = session.createTextMessage();
-
-            producer.send(message);
-            assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
-
-            connection.close();
-
-            testPeer.waitForAllHandlersToComplete(1000);
-        }
-    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org