You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/09/12 20:12:04 UTC

[3/7] qpid-jms git commit: QPIDJMS-207 Adds support for Asynchronous JMS 2.0 sends.

QPIDJMS-207 Adds support for Asynchronous JMS 2.0 sends.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/3a03663b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/3a03663b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/3a03663b

Branch: refs/heads/master
Commit: 3a03663b79f98f80cd75f297cd9b70241ac68da3
Parents: 6553cfd
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Sep 12 13:09:56 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Sep 12 13:09:56 2016 -0400

----------------------------------------------------------------------
 .../apache/qpid/jms/JmsCompletionListener.java  |  47 ++
 .../java/org/apache/qpid/jms/JmsConnection.java |  36 +-
 .../org/apache/qpid/jms/JmsMessageConsumer.java |   6 +-
 .../org/apache/qpid/jms/JmsMessageProducer.java | 100 +++-
 .../java/org/apache/qpid/jms/JmsSession.java    | 349 +++++++++--
 .../jms/message/JmsInboundMessageDispatch.java  |  21 +-
 .../jms/message/JmsOutboundMessageDispatch.java |  42 +-
 .../jms/provider/DefaultProviderListener.java   |   9 +
 .../qpid/jms/provider/ProviderListener.java     |  22 +
 .../qpid/jms/provider/ProviderWrapper.java      |  10 +
 .../jms/provider/amqp/AmqpAbstractResource.java |   5 +-
 .../amqp/AmqpAnonymousFallbackProducer.java     |  17 +-
 .../qpid/jms/provider/amqp/AmqpConsumer.java    |  55 +-
 .../qpid/jms/provider/amqp/AmqpEventSink.java   |   6 +-
 .../jms/provider/amqp/AmqpExceptionBuilder.java |  34 ++
 .../jms/provider/amqp/AmqpFixedProducer.java    | 257 +++++---
 .../qpid/jms/provider/amqp/AmqpProducer.java    |  15 +-
 .../qpid/jms/provider/amqp/AmqpProvider.java    |  39 +-
 .../provider/amqp/AmqpTransactionContext.java   | 155 +++--
 .../amqp/AmqpTransactionCoordinator.java        |   4 +-
 .../amqp/builders/AmqpResourceBuilder.java      |  13 +-
 .../jms/provider/failover/FailoverProvider.java |  18 +
 .../integration/ConsumerIntegrationTest.java    | 109 ++++
 .../PresettledProducerIntegrationTest.java      | 231 ++++++++
 .../integration/ProducerIntegrationTest.java    | 592 +++++++++++++++++++
 .../jms/integration/SessionIntegrationTest.java | 114 +++-
 .../jms/producer/JmsMessageProducerTest.java    | 421 ++++++++++++-
 .../failover/FailoverIntegrationTest.java       | 194 ++++++
 .../qpid/jms/provider/mock/MockProvider.java    |  12 +-
 .../mock/MockProviderConfiguration.java         |  10 +
 .../qpid/jms/provider/mock/MockRemotePeer.java  | 123 ++++
 31 files changed, 2793 insertions(+), 273 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsCompletionListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsCompletionListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsCompletionListener.java
new file mode 100644
index 0000000..7a6c4d6
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsCompletionListener.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.Message;
+
+/**
+ * Interface used to implement listeners for asynchronous {@link javax.jms.Message}
+ * sends which will be notified on successful completion of a send or be notified of an
+ * error that was encountered while attempting to send a {@link javax.jms.Message}.
+ */
+public interface JmsCompletionListener {
+
+    /**
+     * Called when an asynchronous send operation completes successfully.
+     *
+     * @param message
+     *      the {@link javax.jms.Message} that was successfully sent.
+     */
+    void onCompletion(Message message);
+
+    /**
+     * Called when an asynchronous send operation fails to complete, the state
+     * of the send is unknown at this point.
+     *
+     * @param message
+     *      the {@link javax.jms.Message} that was to be sent.
+     * @param exception
+     *      the {@link java.lang.Exception} that describes the send error.
+     */
+    void onException(Message message, Exception exception);
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 827da11..a04d1b3 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -156,6 +156,11 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
     public void close() throws JMSException {
         boolean interrupted = Thread.interrupted();
 
+        for (JmsSession session : sessions.values()) {
+            session.checkIsDeliveryThread();
+            session.checkIsCompletionThread();
+        }
+
         try {
 
             if (!closed.get() && !failed.get()) {
@@ -1072,6 +1077,26 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
     }
 
     @Override
+    public void onCompletedMessageSend(JmsOutboundMessageDispatch envelope) {
+        JmsSession session = sessions.get(envelope.getProducerId().getParentId());
+        if (session != null) {
+            session.onCompletedMessageSend(envelope);
+        } else {
+            LOG.debug("No matching Session found for async send result");
+        }
+    }
+
+    @Override
+    public void onFailedMessageSend(JmsOutboundMessageDispatch envelope, Throwable cause) {
+        JmsSession session = sessions.get(envelope.getProducerId().getParentId());
+        if (session != null) {
+            session.onFailedMessageSend(envelope, cause);
+        } else {
+            LOG.debug("No matching Session found for failed async send result");
+        }
+    }
+
+    @Override
     public void onConnectionInterrupted(final URI remoteURI) {
         for (JmsSession session : sessions.values()) {
             session.onConnectionInterrupted();
@@ -1161,6 +1186,12 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
     public void onConnectionFailure(final IOException ex) {
         providerFailed(ex);
 
+        // Signal that connection dropped we need to mark transactions as
+        // failed, deliver failure events to asynchronous send completions etc.
+        for (JmsSession session : sessions.values()) {
+            session.onConnectionInterrupted();
+        }
+
         onProviderException(ex);
 
         for (AsyncResult request : requests.keySet()) {
@@ -1304,10 +1335,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
         if (!closed.get() && !closing.get()) {
             if (this.exceptionListener != null) {
 
-                if (!(error instanceof JMSException)) {
-                    error = JmsExceptionSupport.create(error);
-                }
-                final JMSException jmsError = (JMSException)error;
+                final JMSException jmsError = JmsExceptionSupport.create(error);
 
                 executor.execute(new Runnable() {
                     @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index de7ef63..18fd764 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -438,10 +438,10 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
             }
 
             if (this.messageListener != null && this.started) {
-                session.getExecutor().execute(new MessageDeliverTask());
+                session.getDispatcherExecutor().execute(new MessageDeliverTask());
             } else {
                 if (availableListener != null) {
-                    session.getExecutor().execute(new Runnable() {
+                    session.getDispatcherExecutor().execute(new Runnable() {
                         @Override
                         public void run() {
                             if (session.isStarted()) {
@@ -507,7 +507,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
 
     void drainMessageQueueToListener() {
         if (this.messageListener != null && this.started) {
-            session.getExecutor().execute(new MessageDeliverTask());
+            session.getDispatcherExecutor().execute(new MessageDeliverTask());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
index a1bbe38..65812b7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
@@ -158,7 +158,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
             throw new UnsupportedOperationException("Using this method is not supported on producers created without an explicit Destination");
         }
 
-        sendMessage(producerInfo.getDestination(), message, deliveryMode, priority, timeToLive);
+        sendMessage(producerInfo.getDestination(), message, deliveryMode, priority, timeToLive, null);
     }
 
     @Override
@@ -174,15 +174,107 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
             throw new UnsupportedOperationException("Using this method is not supported on producers created with an explicit Destination.");
         }
 
-        sendMessage(destination, message, deliveryMode, priority, timeToLive);
+        sendMessage(destination, message, deliveryMode, priority, timeToLive, null);
     }
 
-    private void sendMessage(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
+    /**
+     * Sends the message asynchronously and notifies the assigned listener on success or failure
+     *
+     * @param message
+     *      the {@link javax.jms.Message} to send.
+     * @param listener
+     *      the {@link JmsCompletionListener} to notify on send success or failure.
+     *
+     * @throws JMSException if an error occurs while attempting to send the Message.
+     */
+    public void send(Message message, JmsCompletionListener listener) throws JMSException {
+        send(message, this.deliveryMode, this.priority, this.timeToLive, listener);
+    }
+
+    /**
+     * Sends the message asynchronously and notifies the assigned listener on success or failure
+     *
+     * @param message
+     *      the {@link javax.jms.Message} to send.
+     * @param deliveryMode
+     *      the delivery mode to assign to the outbound Message.
+     * @param priority
+     *      the priority to assign to the outbound Message.
+     * @param timeToLive
+     *      the time to live value to assign to the outbound Message.
+     * @param listener
+     *      the {@link JmsCompletionListener} to notify on send success or failure.
+     *
+     * @throws JMSException if an error occurs while attempting to send the Message.
+     */
+    public void send(Message message, int deliveryMode, int priority, long timeToLive, JmsCompletionListener listener) throws JMSException {
+        checkClosed();
+
+        if (anonymousProducer) {
+            throw new UnsupportedOperationException("Using this method is not supported on producers created without an explicit Destination");
+        }
+
+        if (listener == null) {
+            throw new IllegalArgumentException("JmsCompletetionListener cannot be null");
+        }
+
+        sendMessage(producerInfo.getDestination(), message, deliveryMode, priority, timeToLive, listener);
+    }
+
+    /**
+     * Sends the message asynchronously and notifies the assigned listener on success or failure
+     *
+     * @param destination
+     *      the Destination to send the given Message to.
+     * @param message
+     *      the {@link javax.jms.Message} to send.
+     * @param listener
+     *      the {@link JmsCompletionListener} to notify on send success or failure.
+     *
+     * @throws JMSException if an error occurs while attempting to send the Message.
+     */
+    public void send(Destination destination, Message message, JmsCompletionListener listener) throws JMSException {
+        send(destination, message, this.deliveryMode, this.priority, this.timeToLive, listener);
+    }
+
+    /**
+     * Sends the message asynchronously and notifies the assigned listener on success or failure
+     *
+     * @param destination
+     *      the Destination to send the given Message to.
+     * @param message
+     *      the {@link javax.jms.Message} to send.
+     * @param deliveryMode
+     *      the delivery mode to assign to the outbound Message.
+     * @param priority
+     *      the priority to assign to the outbound Message.
+     * @param timeToLive
+     *      the time to live value to assign to the outbound Message.
+     * @param listener
+     *      the {@link JmsCompletionListener} to notify on send success or failure.
+     *
+     * @throws JMSException if an error occurs while attempting to send the Message.
+     */
+    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, JmsCompletionListener listener) throws JMSException {
+        checkClosed();
+
+        if (!anonymousProducer) {
+            throw new UnsupportedOperationException("Using this method is not supported on producers created with an explicit Destination.");
+        }
+
+        if (listener == null) {
+            throw new IllegalArgumentException("JmsCompletetionListener cannot be null");
+        }
+
+        sendMessage(destination, message, deliveryMode, priority, timeToLive, listener);
+    }
+
+    private void sendMessage(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, JmsCompletionListener listener) throws JMSException {
         if (destination == null) {
             throw new InvalidDestinationException("Don't understand null destinations");
         }
 
-        this.session.send(this, destination, message, deliveryMode, priority, timeToLive, disableMessageId, disableTimestamp);
+        this.session.send(this, destination, message, deliveryMode, priority, timeToLive, disableMessageId, disableTimestamp, listener);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 4644267..817f342 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -18,8 +18,11 @@ package org.apache.qpid.jms;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Deque;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -58,6 +61,7 @@ import javax.jms.TopicPublisher;
 import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
 
+import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
 import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
 import org.apache.qpid.jms.message.JmsMessage;
 import org.apache.qpid.jms.message.JmsMessageTransformation;
@@ -98,14 +102,18 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
     private final LinkedBlockingQueue<JmsInboundMessageDispatch> stoppedMessages =
         new LinkedBlockingQueue<JmsInboundMessageDispatch>(10000);
     private final JmsSessionInfo sessionInfo;
-    private volatile ExecutorService executor;
     private final ReentrantLock sendLock = new ReentrantLock();
+    private volatile ExecutorService deliveryExecutor;
+    private volatile ExecutorService completionExcecutor;
+    private Thread deliveryThread;
+    private Thread completionThread;
 
     private final AtomicLong consumerIdGenerator = new AtomicLong();
     private final AtomicLong producerIdGenerator = new AtomicLong();
     private JmsTransactionContext transactionContext;
     private boolean sessionRecovered;
     private final AtomicReference<Exception> failureCause = new AtomicReference<Exception>();
+    private final Deque<SendCompletion> asyncSendQueue = new ConcurrentLinkedDeque<SendCompletion>();
 
     protected JmsSession(JmsConnection connection, JmsSessionId sessionId, int acknowledgementMode) throws JMSException {
         this.connection = connection;
@@ -178,6 +186,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
     @Override
     public void commit() throws JMSException {
         checkClosed();
+        checkIsCompletionThread();
 
         if (!getTransacted()) {
             throw new javax.jms.IllegalStateException("Not a transacted session");
@@ -189,6 +198,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
     @Override
     public void rollback() throws JMSException {
         checkClosed();
+        checkIsCompletionThread();
 
         if (!getTransacted()) {
             throw new javax.jms.IllegalStateException("Not a transacted session");
@@ -223,6 +233,9 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
 
     @Override
     public void close() throws JMSException {
+        checkIsDeliveryThread();
+        checkIsCompletionThread();
+
         if (!closed.get()) {
             doClose();
         }
@@ -272,11 +285,22 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
             }
 
             transactionContext.shutdown();
+
+            synchronized (sessionInfo) {
+                if (completionExcecutor != null) {
+                    completionExcecutor.shutdown();
+                    completionExcecutor = null;
+                }
+            }
         }
     }
 
     void sessionClosed(Exception cause) {
         try {
+            // TODO - This assumes we can't rely on the AmqpProvider to signal all pending
+            //        asynchronous send completions that they are failed when the session
+            //        is remotely closed.
+            getCompletionExecutor().execute(new FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)));
             shutdown(cause);
         } catch (Throwable error) {
             LOG.trace("Ignoring exception thrown during cleanup of closed session", error);
@@ -306,6 +330,11 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
 
         try {
             if (producer != null) {
+                // TODO - This assumes we can't rely on the AmqpProvider to signal all pending
+                //        asynchronous send completions that they are failed when the producer
+                //        is remotely closed.
+                getCompletionExecutor().execute(new FailOrCompleteAsyncCompletionsTask(
+                    producer.getProducerId(), JmsExceptionSupport.create(cause)));
                 producer.shutdown(cause);
             }
         } catch (Throwable error) {
@@ -624,17 +653,17 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         connection.onException(ex);
     }
 
-    protected void send(JmsMessageProducer producer, Destination dest, Message msg, int deliveryMode, int priority, long timeToLive, boolean disableMsgId, boolean disableTimestamp) throws JMSException {
+    protected void send(JmsMessageProducer producer, Destination dest, Message msg, int deliveryMode, int priority, long timeToLive, boolean disableMsgId, boolean disableTimestamp, JmsCompletionListener listener) throws JMSException {
         JmsDestination destination = JmsMessageTransformation.transformDestination(connection, dest);
 
         if (destination.isTemporary() && ((JmsTemporaryDestination) destination).isDeleted()) {
             throw new IllegalStateException("Temporary destination has been deleted");
         }
 
-        send(producer, destination, msg, deliveryMode, priority, timeToLive, disableMsgId, disableTimestamp);
+        send(producer, destination, msg, deliveryMode, priority, timeToLive, disableMsgId, disableTimestamp, listener);
     }
 
-    private void send(JmsMessageProducer producer, JmsDestination destination, Message original, int deliveryMode, int priority, long timeToLive, boolean disableMsgId, boolean disableTimestamp) throws JMSException {
+    private void send(JmsMessageProducer producer, JmsDestination destination, Message original, int deliveryMode, int priority, long timeToLive, boolean disableMsgId, boolean disableTimestamp, JmsCompletionListener listener) throws JMSException {
         sendLock.lock();
         try {
             original.setJMSDeliveryMode(deliveryMode);
@@ -707,14 +736,35 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
             envelope.setMessage(copy);
             envelope.setProducerId(producer.getProducerId());
             envelope.setDestination(destination);
-            envelope.setSendAsync(!sync);
+            envelope.setSendAsync(listener == null ? !sync : true);
             envelope.setDispatchId(messageSequence);
+            envelope.setCompletionRequired(listener != null);
 
             if (producer.isAnonymous()) {
                 envelope.setPresettle(getPresettlePolicy().isProducerPresttled(this, destination));
             }
 
-            transactionContext.send(connection, envelope);
+            SendCompletion completion = null;
+            if (envelope.isCompletionRequired()) {
+                completion = new SendCompletion(envelope, listener);
+                asyncSendQueue.addLast(completion);
+            }
+
+            try {
+                transactionContext.send(connection, envelope);
+            } catch (JMSException jmsEx) {
+                // If the synchronous portion of the send fails the completion be
+                // notified but might depending on the circumstances of the failures,
+                // remove it from the queue and check if is is already completed.
+                if (completion != null) {
+                    asyncSendQueue.remove(completion);
+                    if (completion.hasCompleted()) {
+                        return;
+                    }
+                }
+
+                throw jmsEx;
+            }
         } finally {
             sendLock.unlock();
         }
@@ -837,9 +887,9 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         }
 
         synchronized (sessionInfo) {
-            if (executor != null) {
-                executor.shutdown();
-                executor = null;
+            if (deliveryExecutor != null) {
+                deliveryExecutor.shutdown();
+                deliveryExecutor = null;
             }
         }
     }
@@ -852,29 +902,62 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         return connection;
     }
 
-    Executor getExecutor() {
-        ExecutorService exec = executor;
-        if(exec == null) {
+    Executor getDispatcherExecutor() {
+        ExecutorService exec = deliveryExecutor;
+        if (exec == null) {
             synchronized (sessionInfo) {
-                if (executor == null) {
-                    executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
-                        @Override
-                        public Thread newThread(Runnable runner) {
-                            Thread executor = new Thread(runner);
-                            executor.setName("JmsSession ["+ sessionInfo.getId() + "] dispatcher");
-                            executor.setDaemon(true);
-                            return executor;
-                        }
-                    });
+                if (deliveryExecutor == null) {
+                    deliveryExecutor = createExecutor("delivery dispatcher");
                 }
 
-                exec = executor;
+                exec = deliveryExecutor;
+                exec.execute(new Runnable() {
+
+                    @Override
+                    public void run() {
+                        JmsSession.this.deliveryThread = Thread.currentThread();
+                    }
+                });
             }
         }
 
         return exec;
     }
 
+    Executor getCompletionExecutor() {
+        ExecutorService exec = completionExcecutor;
+        if (exec == null) {
+            synchronized (sessionInfo) {
+                if (completionExcecutor == null) {
+                    completionExcecutor = createExecutor("completion dispatcher");
+                }
+
+                exec = completionExcecutor;
+                exec.execute(new Runnable() {
+
+                    @Override
+                    public void run() {
+                        JmsSession.this.completionThread = Thread.currentThread();
+                    }
+                });
+            }
+        }
+
+        return exec;
+    }
+
+    private ExecutorService createExecutor(final String threadNameSuffix) {
+        return Executors.newSingleThreadExecutor(new ThreadFactory() {
+            @Override
+            public Thread newThread(Runnable runner) {
+                Thread executor = new Thread(runner);
+                executor.setName("JmsSession ["+ sessionInfo.getId() + "] " + threadNameSuffix);
+                executor.setDaemon(true);
+                return executor;
+            }
+        });
+    }
+
     protected JmsSessionInfo getSessionInfo() {
         return sessionInfo;
     }
@@ -925,6 +1008,18 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         }
     }
 
+    void checkIsDeliveryThread() throws JMSException {
+        if (Thread.currentThread().equals(deliveryThread)) {
+            throw new IllegalStateException("Illegal invocation from MessageListener callback");
+        }
+    }
+
+    void checkIsCompletionThread() throws JMSException {
+        if (Thread.currentThread().equals(completionThread)) {
+            throw new IllegalStateException("Illegal invocation from CompletionListener callback");
+        }
+    }
+
     public JmsMessageIDPolicy getMessageIDPolicy() {
         return sessionInfo.getMessageIDPolicy();
     }
@@ -945,6 +1040,36 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         return sessionInfo.getDeserializationPolicy();
     }
 
+    /**
+     * Sets the transaction context of the session.
+     *
+     * @param transactionContext
+     *        provides the means to control a JMS transaction.
+     */
+    public void setTransactionContext(JmsTransactionContext transactionContext) {
+        this.transactionContext = transactionContext;
+    }
+
+    /**
+     * Returns the transaction context of the session.
+     *
+     * @return transactionContext
+     *         session's transaction context.
+     */
+    public JmsTransactionContext getTransactionContext() {
+        return transactionContext;
+    }
+
+    boolean isSessionRecovered() {
+        return sessionRecovered;
+    }
+
+    void clearSessionRecovered() {
+        sessionRecovered = false;
+    }
+
+    //----- Event handlers ---------------------------------------------------//
+
     @Override
     public void onInboundMessage(JmsInboundMessageDispatch envelope) {
         if (started.get()) {
@@ -954,10 +1079,22 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         }
     }
 
+    protected void onCompletedMessageSend(final JmsOutboundMessageDispatch envelope) {
+        getCompletionExecutor().execute(new AsyncCompletionTask(envelope));
+    }
+
+    protected void onFailedMessageSend(final JmsOutboundMessageDispatch envelope, final Throwable cause) {
+        getCompletionExecutor().execute(new AsyncCompletionTask(envelope, cause));
+    }
+
     protected void onConnectionInterrupted() {
 
         transactionContext.onConnectionInterrupted();
 
+        // TODO - Synthesize a better exception
+        JMSException failureCause = new JMSException("Send failed due to connection loss");
+        getCompletionExecutor().execute(new FailOrCompleteAsyncCompletionsTask(failureCause));
+
         for (JmsMessageProducer producer : producers.values()) {
             producer.onConnectionInterrupted();
         }
@@ -1019,31 +1156,155 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         }
     }
 
-    /**
-     * Sets the transaction context of the session.
-     *
-     * @param transactionContext
-     *        provides the means to control a JMS transaction.
-     */
-    public void setTransactionContext(JmsTransactionContext transactionContext) {
-        this.transactionContext = transactionContext;
-    }
+    //----- Asynchronous Send Helpers ----------------------------------------//
 
-    /**
-     * Returns the transaction context of the session.
-     *
-     * @return transactionContext
-     *         session's transaction context.
-     */
-    public JmsTransactionContext getTransactionContext() {
-        return transactionContext;
+    private final class FailOrCompleteAsyncCompletionsTask implements Runnable {
+
+        private final JMSException failureCause;
+        private final JmsProducerId producerId;
+
+        public FailOrCompleteAsyncCompletionsTask(JMSException failureCause) {
+            this(null, failureCause);
+        }
+
+        public FailOrCompleteAsyncCompletionsTask(JmsProducerId producerId, JMSException failureCause) {
+            this.failureCause = failureCause;
+            this.producerId = producerId;
+        }
+
+        @Override
+        public void run() {
+            // For any completion that is not yet marked as complete we fail it
+            // otherwise we send the already marked completion state event.
+            Iterator<SendCompletion> pending = asyncSendQueue.iterator();
+            while (pending.hasNext()) {
+                SendCompletion completion = pending.next();
+
+                if (!completion.hasCompleted()) {
+                    if (producerId == null || producerId.equals(completion.envelope.getProducerId())) {
+                        completion.markAsFailed(failureCause);
+                    }
+                }
+
+                try {
+                    completion.signalCompletion();
+                } catch (Throwable error) {
+                    LOG.trace("Signaled completion of send: {}", completion.envelope);
+                }
+            }
+
+            asyncSendQueue.clear();
+        }
     }
 
-    boolean isSessionRecovered() {
-        return sessionRecovered;
+    private final class AsyncCompletionTask implements Runnable {
+
+        private final JmsOutboundMessageDispatch envelope;
+        private final Throwable cause;
+
+        public AsyncCompletionTask(JmsOutboundMessageDispatch envelope) {
+            this(envelope, null);
+        }
+
+        public AsyncCompletionTask(JmsOutboundMessageDispatch envelope, Throwable cause) {
+            this.envelope = envelope;
+            this.cause = cause;
+        }
+
+        @Override
+        public void run() {
+            try {
+                SendCompletion completion = asyncSendQueue.peek();
+                if (completion.getEnvelope().getDispatchId() == envelope.getDispatchId()) {
+                    try {
+                        completion = asyncSendQueue.remove();
+                        if (cause == null) {
+                            completion.markAsComplete();
+                        } else {
+                            completion.markAsFailed(JmsExceptionSupport.create(cause));
+                        }
+                        completion.signalCompletion();
+                    } catch (Throwable error) {
+                        LOG.trace("Failed while performing send completion: {}", envelope);
+                        // TODO - What now?
+                    }
+
+                    // Signal any trailing completions that have been marked complete
+                    // before this one was that they have now that the one in front has
+                    Iterator<SendCompletion> pending = asyncSendQueue.iterator();
+                    while (pending.hasNext()) {
+                        completion = pending.next();
+                        if (completion.hasCompleted()) {
+                            try {
+                                completion.signalCompletion();
+                            } catch (Throwable error) {
+                                LOG.trace("Failed while performing send completion: {}", envelope);
+                                // TODO - What now?
+                            } finally {
+                                pending.remove();
+                            }
+                        } else {
+                            break;
+                        }
+                    }
+                } else {
+                    // Not head so mark as complete and wait for the one in front to send
+                    // the notification of completion.
+                    Iterator<SendCompletion> pending = asyncSendQueue.iterator();
+                    while (pending.hasNext()) {
+                        completion = pending.next();
+                        if (completion.getEnvelope().getDispatchId() == envelope.getDispatchId()) {
+                            if (cause == null) {
+                                completion.markAsComplete();
+                            } else {
+                                completion.markAsFailed(JmsExceptionSupport.create(cause));
+                            }
+                        }
+                    }
+                }
+            } catch (Exception ex) {
+                LOG.debug("Send completion task encounted unexpected error: {}", ex.getMessage());
+                // TODO - What now
+            }
+        }
     }
 
-    void clearSessionRecovered() {
-        sessionRecovered = false;
+    private final class SendCompletion {
+
+        private final JmsOutboundMessageDispatch envelope;
+        private final JmsCompletionListener listener;
+
+        private Exception failureCause;
+        private boolean completed;
+
+        public SendCompletion(JmsOutboundMessageDispatch envelope, JmsCompletionListener listener) {
+            this.envelope = envelope;
+            this.listener = listener;
+        }
+
+        public void markAsComplete() {
+            completed = true;
+        }
+
+        public void markAsFailed(Exception cause) {
+            completed = true;
+            failureCause = cause;
+        }
+
+        public boolean hasCompleted() {
+            return completed;
+        }
+
+        public void signalCompletion() {
+            if (failureCause == null) {
+                listener.onCompletion(envelope.getMessage());
+            } else {
+                listener.onException(envelope.getMessage(), failureCause);
+            }
+        }
+
+        public JmsOutboundMessageDispatch getEnvelope() {
+            return envelope;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
index 577ac17..c038519 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
@@ -30,6 +30,8 @@ public class JmsInboundMessageDispatch extends JmsAbstractResourceId {
     private JmsMessage message;
     private boolean enqueueFirst;
 
+    private transient String stringView;
+
     public JmsInboundMessageDispatch(long sequence) {
         this.sequence = sequence;
     }
@@ -74,10 +76,21 @@ public class JmsInboundMessageDispatch extends JmsAbstractResourceId {
 
     @Override
     public String toString() {
-        return "JmsInboundMessageDispatch {sequence = " + sequence
-                                      + ", messageId = " + messageId
-                                      + ", consumerId = " + consumerId
-                                      + "}";
+        if (stringView == null) {
+            StringBuilder builder = new StringBuilder();
+
+            builder.append("JmsInboundMessageDispatch { sequence = ");
+            builder.append(sequence);
+            builder.append(", messageId = ");
+            builder.append(messageId);
+            builder.append(", consumerId = ");
+            builder.append(consumerId);
+            builder.append(" }");
+
+            stringView = builder.toString();
+        }
+
+        return stringView;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsOutboundMessageDispatch.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsOutboundMessageDispatch.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsOutboundMessageDispatch.java
index 05b9089..a34768f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsOutboundMessageDispatch.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsOutboundMessageDispatch.java
@@ -29,8 +29,11 @@ public class JmsOutboundMessageDispatch {
     private JmsDestination destination;
     private boolean sendAsync;
     private boolean presettle;
+    private boolean completionRequired;
     private long dispatchId;
 
+    private transient String stringView;
+
     public JmsDestination getDestination() {
         return destination;
     }
@@ -39,6 +42,10 @@ public class JmsOutboundMessageDispatch {
         this.destination = destination;
     }
 
+    public Object getMessageId() {
+        return message.getFacade().getProviderMessageIdObject();
+    }
+
     public JmsMessage getMessage() {
         return message;
     }
@@ -79,15 +86,34 @@ public class JmsOutboundMessageDispatch {
         this.presettle = presettle;
     }
 
-    @Override
-    public String toString() {
-        StringBuilder value = new StringBuilder();
+    public boolean isCompletionRequired() {
+        return completionRequired;
+    }
 
-        value.append("JmsOutboundMessageDispatch {dispatchId = ");
-        value.append(getProducerId());
-        value.append("-");
-        value.append(getDispatchId());
+    public void setCompletionRequired(boolean completionRequired) {
+        this.completionRequired = completionRequired;
+    }
 
-        return value.toString();
+    @Override
+    public String toString() {
+        if (stringView == null) {
+            StringBuilder value = new StringBuilder();
+
+            value.append("JmsOutboundMessageDispatch {dispatchId = ");
+            value.append(getProducerId());
+            value.append("-");
+            value.append(getDispatchId());
+            value.append(", MessageID = ");
+            try {
+                value.append(message.getJMSMessageID());
+            } catch (Throwable e) {
+                value.append("<unknown>");
+            }
+            value.append(" }");
+
+            stringView = value.toString();
+        }
+
+        return stringView;
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java
index 22e204e..d2eb95c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.net.URI;
 
 import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
 import org.apache.qpid.jms.meta.JmsResource;
 
 /**
@@ -32,6 +33,14 @@ public class DefaultProviderListener implements ProviderListener {
     }
 
     @Override
+    public void onCompletedMessageSend(JmsOutboundMessageDispatch envelope) {
+    }
+
+    @Override
+    public void onFailedMessageSend(JmsOutboundMessageDispatch envelope, Throwable cause) {
+    }
+
+    @Override
     public void onConnectionInterrupted(URI remoteURI) {
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java
index 5c758ed..11a5f6b 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.net.URI;
 
 import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
 import org.apache.qpid.jms.meta.JmsResource;
 
 /**
@@ -36,6 +37,27 @@ public interface ProviderListener {
     void onInboundMessage(JmsInboundMessageDispatch envelope);
 
     /**
+     * Called when an outbound message dispatch that requested a completion callback
+     * has reached a state where the send can be considered successful based on the QoS
+     * level associated of the outbound message.
+     *
+     * @param envelope
+     *      the original outbound message dispatch that is now complete.
+     */
+    void onCompletedMessageSend(JmsOutboundMessageDispatch envelope);
+
+    /**
+     * Called when an outbound message dispatch that requested a completion callback
+     * has reached a state where the send can be considered failed.
+     *
+     * @param envelope
+     *      the original outbound message dispatch that should be treated as a failed send.
+     * @param cause
+     *      the exception that describes the cause of the failed send.
+     */
+    void onFailedMessageSend(JmsOutboundMessageDispatch envelope, Throwable cause);
+
+    /**
      * Called from a fault tolerant Provider instance to signal that the underlying
      * connection to the Broker has been lost.  The Provider will attempt to reconnect
      * following this event unless closed.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
index 8574e04..3a3d383 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
@@ -154,6 +154,16 @@ public class ProviderWrapper<E extends Provider> implements Provider, ProviderLi
     }
 
     @Override
+    public void onCompletedMessageSend(JmsOutboundMessageDispatch envelope) {
+        listener.onCompletedMessageSend(envelope);
+    }
+
+    @Override
+    public void onFailedMessageSend(JmsOutboundMessageDispatch envelope, Throwable cause) {
+        listener.onFailedMessageSend(envelope, cause);
+    }
+
+    @Override
     public void onConnectionInterrupted(URI remoteURI) {
         listener.onConnectionInterrupted(remoteURI);
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
index a8599ac..634cafd 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
@@ -23,6 +23,7 @@ import org.apache.qpid.jms.JmsOperationTimedOutException;
 import org.apache.qpid.jms.meta.JmsConnectionInfo;
 import org.apache.qpid.jms.meta.JmsResource;
 import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Endpoint;
 import org.apache.qpid.proton.engine.EndpointState;
 import org.slf4j.Logger;
@@ -158,7 +159,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
             closeOrDetachEndpoint();
         }
 
-        // Process the close now, so that child close operations see the correct state.
+        // Process the close before moving on to closing down child resources
         provider.pumpToProtonTransport();
 
         handleResourceClosure(provider, error);
@@ -253,7 +254,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
     }
 
     @Override
-    public void processDeliveryUpdates(AmqpProvider provider) throws IOException {
+    public void processDeliveryUpdates(AmqpProvider provider, Delivery delivery) throws IOException {
         // Nothing do be done here, subclasses can override as needed.
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
index b44e3b3..3e07b25 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
@@ -69,7 +69,7 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
     }
 
     @Override
-    public boolean send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException {
+    public void send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException {
         LOG.trace("Started send chain for anonymous producer: {}", getProducerId());
 
         // Force sends marked as asynchronous to be sent synchronous so that the temporary
@@ -91,7 +91,8 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
 
             // We open a Fixed Producer instance with the target destination.  Once it opens
             // it will trigger the open event which will in turn trigger the send event.
-            // If caching is disabled the created producer will be closed immediately.
+            // If caching is disabled the created producer will be closed immediately after
+            // the entire send chain has finished and the delivery has been acknowledged.
             AmqpProducerBuilder builder = new AmqpProducerBuilder(session, info);
             builder.buildResource(new AnonymousSendRequest(request, builder, envelope));
 
@@ -100,9 +101,9 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
                 producerCache.put(envelope.getDestination(), builder.getResource());
             }
 
-            return true;
+            getParent().getProvider().pumpToProtonTransport(request);
         } else {
-            return producer.send(envelope, request);
+            producer.send(envelope, request);
         }
     }
 
@@ -135,6 +136,14 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
         return new JmsProducerId(producerIdKey, -1, producerIdCount++);
     }
 
+    @Override
+    public void addSendCompletionWatcher(AsyncResult watcher) {
+        throw new UnsupportedOperationException(
+            "The fallback producer parent should never have a watcher assigned.");
+    }
+
+    //----- AsyncResult objects used to complete the sends -------------------//
+
     private abstract class AnonymousRequest extends WrappedAsyncResult {
 
         protected final JmsOutboundMessageDispatch envelope;

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index c15fd08..89586e3 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -385,42 +385,35 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
     }
 
     @Override
-    public void processDeliveryUpdates(AmqpProvider provider) throws IOException {
-        Delivery incoming = null;
-        do {
-            incoming = getEndpoint().current();
-            if (incoming != null) {
-                if (incoming.isReadable() && !incoming.isPartial()) {
-                    LOG.trace("{} has incoming Message(s).", this);
-                    try {
-                        if (processDelivery(incoming)) {
-                            // We processed a message, signal completion
-                            // of a message pull request if there is one.
-                            if (pullRequest != null) {
-                                pullRequest.onSuccess();
-                                pullRequest = null;
-                            }
-                        }
-                    } catch (Exception e) {
-                        throw IOExceptionSupport.create(e);
+    public void processDeliveryUpdates(AmqpProvider provider, Delivery delivery) throws IOException {
+        if (delivery.isReadable() && !delivery.isPartial()) {
+            LOG.trace("{} has incoming Message(s).", this);
+            try {
+                if (processDelivery(delivery)) {
+                    // We processed a message, signal completion
+                    // of a message pull request if there is one.
+                    if (pullRequest != null) {
+                        pullRequest.onSuccess();
+                        pullRequest = null;
                     }
-                } else {
-                    LOG.trace("{} has a partial incoming Message(s), deferring.", this);
-                    incoming = null;
                 }
-            } else {
-                // We have exhausted the locally queued messages on this link.
-                // Check if we tried to stop and have now run out of credit.
-                if (getEndpoint().getRemoteCredit() <= 0) {
-                    if (stopRequest != null) {
-                        stopRequest.onSuccess();
-                        stopRequest = null;
-                    }
+            } catch (Exception e) {
+                throw IOExceptionSupport.create(e);
+            }
+        }
+
+        if (getEndpoint().current() == null) {
+            // We have exhausted the locally queued messages on this link.
+            // Check if we tried to stop and have now run out of credit.
+            if (getEndpoint().getRemoteCredit() <= 0) {
+                if (stopRequest != null) {
+                    stopRequest.onSuccess();
+                    stopRequest = null;
                 }
             }
-        } while (incoming != null);
+        }
 
-        super.processDeliveryUpdates(provider);
+        super.processDeliveryUpdates(provider, delivery);
     }
 
     private boolean processDelivery(Delivery incoming) throws Exception {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpEventSink.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpEventSink.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpEventSink.java
index b3e7501..2e25ad1 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpEventSink.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpEventSink.java
@@ -18,6 +18,8 @@ package org.apache.qpid.jms.provider.amqp;
 
 import java.io.IOException;
 
+import org.apache.qpid.proton.engine.Delivery;
+
 /**
  * Interface used by classes that want to process AMQP events sent from
  * the transport layer.
@@ -60,10 +62,12 @@ public interface AmqpEventSink {
      *
      * @param provider
      *        the AmqpProvider instance for easier access to fire events.
+     * @param delivery
+     *        the Delivery that has an update to its state which needs handled.
      *
      * @throws IOException if an error occurs while processing the update.
      */
-    void processDeliveryUpdates(AmqpProvider provider) throws IOException;
+    void processDeliveryUpdates(AmqpProvider provider, Delivery delivery) throws IOException;
 
     /**
      * Called when the Proton Engine signals an Flow related event has been triggered

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpExceptionBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpExceptionBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpExceptionBuilder.java
new file mode 100644
index 0000000..2ecf245
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpExceptionBuilder.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+/**
+ * Used to provide a source for an exception based on some event such as
+ * operation timed out, etc.
+ */
+public interface AmqpExceptionBuilder {
+
+    /**
+     * Creates an exception appropriate to some failure condition
+     *
+     * @return a new Exception instance that describes a failure condition.
+     */
+    Exception createException();
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index c354822..9233ce1 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -18,10 +18,10 @@ package org.apache.qpid.jms.provider.amqp;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.LinkedHashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
 import java.util.concurrent.ScheduledFuture;
 
 import javax.jms.IllegalStateException;
@@ -62,10 +62,12 @@ public class AmqpFixedProducer extends AmqpProducer {
     private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
 
     private final AmqpTransferTagGenerator tagGenerator = new AmqpTransferTagGenerator(true);
-    private final Set<Delivery> sent = new LinkedHashSet<Delivery>();
-    private final LinkedList<InFlightSend> blocked = new LinkedList<InFlightSend>();
+    private final Map<Object, InFlightSend> sent = new LinkedHashMap<Object, InFlightSend>();
+    private final Map<Object, InFlightSend> blocked = new LinkedHashMap<Object, InFlightSend>();
     private byte[] encodeBuffer = new byte[1024 * 8];
 
+    private AsyncResult sendCompletionWatcher;
+
     public AmqpFixedProducer(AmqpSession session, JmsProducerInfo info) {
         super(session, info);
     }
@@ -86,29 +88,24 @@ public class AmqpFixedProducer extends AmqpProducer {
     }
 
     @Override
-    public boolean send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException {
+    public void send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException {
         if (isClosed()) {
             request.onFailure(new IllegalStateException("The MessageProducer is closed"));
         }
 
         if (getEndpoint().getCredit() <= 0) {
             LOG.trace("Holding Message send until credit is available.");
-            // Once a message goes into a held mode we no longer can send it async, so
-            // we clear the async flag if set to avoid the sender never getting notified.
-            envelope.setSendAsync(false);
 
             InFlightSend send = new InFlightSend(envelope, request);
 
             if (getSendTimeout() > JmsConnectionInfo.INFINITE) {
-                send.requestTimeout = getParent().getProvider().scheduleRequestTimeout(
-                    send, getSendTimeout(), new JmsSendTimedOutException("Timed out waiting for credit to send Message", envelope.getMessage()));
+                send.requestTimeout = getParent().getProvider().scheduleRequestTimeout(send, getSendTimeout(), send);
             }
 
-            blocked.addLast(send);
-            return false;
+            blocked.put(envelope.getMessageId(), send);
+            getParent().getProvider().pumpToProtonTransport(request);
         } else {
             doSend(envelope, request);
-            return true;
         }
     }
 
@@ -133,35 +130,54 @@ public class AmqpFixedProducer extends AmqpProducer {
             delivery = getEndpoint().delivery(tag, 0, tag.length);
         }
 
-        delivery.setContext(request);
-
         if (session.isTransacted()) {
-            Binary amqpTxId = session.getTransactionContext().getAmqpTransactionId();
+            AmqpTransactionContext context = session.getTransactionContext();
+            Binary amqpTxId = context.getAmqpTransactionId();
             TransactionalState state = new TransactionalState();
             state.setTxnId(amqpTxId);
             delivery.disposition(state);
+            context.registerTxProducer(this);
         }
 
         AmqpJmsMessageFacade amqpMessageFacade = (AmqpJmsMessageFacade) facade;
         encodeAndSend(amqpMessageFacade.getAmqpMessage(), delivery);
 
+        AmqpProvider provider = getParent().getProvider();
+
+        InFlightSend send = null;
+        if (request instanceof InFlightSend) {
+            send = (InFlightSend) request;
+        } else {
+            send = new InFlightSend(envelope, request);
+
+            if (!presettle && getSendTimeout() != JmsConnectionInfo.INFINITE) {
+                send.requestTimeout = getParent().getProvider().scheduleRequestTimeout(send, getSendTimeout(), send);
+            }
+        }
+
         if (presettle) {
             delivery.settle();
         } else {
-            sent.add(delivery);
+            sent.put(envelope.getMessageId(), send);
             getEndpoint().advance();
         }
 
-        if (envelope.isSendAsync() || presettle) {
-            request.onSuccess();
-        } else if (getSendTimeout() != JmsConnectionInfo.INFINITE) {
-            InFlightSend send = new InFlightSend(envelope, request);
-
-            send.requestTimeout = getParent().getProvider().scheduleRequestTimeout(
-                send, getSendTimeout(), new JmsSendTimedOutException("Timed out waiting for disposition of sent Message", envelope.getMessage()));
-
-            // Update context so the incoming disposition can cancel any pending timeout
-            delivery.setContext(send);
+        send.setDelivery(delivery);
+        delivery.setContext(send);
+
+        // Put it on the wire and let it fail if the connection is broken, if it does
+        // get written then continue on to determine when we should complete it.
+        if (provider.pumpToProtonTransport(request)) {
+            // For presettled messages we can just mark as successful and we are done, but
+            // for any other message we still track it until the remote settles.  If the send
+            // was tagged as asynchronous we must mark the original request as complete but
+            // we still need to wait for the disposition before we can consider the send as
+            // having been successful.
+            if (presettle) {
+                send.onSuccess();
+            } else if (envelope.isSendAsync()) {
+                send.getOriginalRequest().onSuccess();
+            }
         }
     }
 
@@ -195,13 +211,16 @@ public class AmqpFixedProducer extends AmqpProducer {
     @Override
     public void processFlowUpdates(AmqpProvider provider) throws IOException {
         if (!blocked.isEmpty() && getEndpoint().getCredit() > 0) {
-            while (getEndpoint().getCredit() > 0 && !blocked.isEmpty()) {
+            Iterator<InFlightSend> blockedSends = blocked.values().iterator();
+            while (getEndpoint().getCredit() > 0 && blockedSends.hasNext()) {
                 LOG.trace("Dispatching previously held send");
-                InFlightSend held = blocked.pop();
+                InFlightSend held = blockedSends.next();
                 try {
-                    doSend(held.envelope, held);
+                    doSend(held.getEnvelope(), held);
                 } catch (JMSException e) {
                     throw IOExceptionSupport.create(e);
+                } finally {
+                    blockedSends.remove();
                 }
             }
         }
@@ -211,25 +230,18 @@ public class AmqpFixedProducer extends AmqpProducer {
             getEndpoint().drained();
         }
 
-        // Once the pending sends queue is drained we can propagate the close request.
-        if (blocked.isEmpty() && isAwaitingClose() && !isClosed()) {
-            super.close(closeRequest);
-        }
-
         super.processFlowUpdates(provider);
     }
 
     @Override
-    public void processDeliveryUpdates(AmqpProvider provider) throws IOException {
-        List<Delivery> toRemove = new ArrayList<Delivery>();
-
-        for (Delivery delivery : sent) {
-            DeliveryState state = delivery.getRemoteState();
-            if (state == null) {
-                continue;
-            }
+    public void processDeliveryUpdates(AmqpProvider provider, Delivery delivery) throws IOException {
+        DeliveryState state = delivery.getRemoteState();
+        if (state != null) {
 
+            InFlightSend send = (InFlightSend) delivery.getContext();
+            Exception deliveryError = null;
             Outcome outcome = null;
+
             if (state instanceof TransactionalState) {
                 LOG.trace("State of delivery is Transactional, retrieving outcome: {}", state);
                 outcome = ((TransactionalState) state).getOutcome();
@@ -240,14 +252,9 @@ public class AmqpFixedProducer extends AmqpProducer {
                 outcome = null;
             }
 
-            AsyncResult request = (AsyncResult) delivery.getContext();
-            Exception deliveryError = null;
-
             if (outcome instanceof Accepted) {
                 LOG.trace("Outcome of delivery was accepted: {}", delivery);
-                if (request != null && !request.isComplete()) {
-                    request.onSuccess();
-                }
+                send.onSuccess();
             } else if (outcome instanceof Rejected) {
                 LOG.trace("Outcome of delivery was rejected: {}", delivery);
                 ErrorCondition remoteError = ((Rejected) outcome).getError();
@@ -265,21 +272,11 @@ public class AmqpFixedProducer extends AmqpProducer {
             }
 
             if (deliveryError != null) {
-                if (request != null && !request.isComplete()) {
-                    request.onFailure(deliveryError);
-                } else {
-                    connection.getProvider().fireNonFatalProviderException(deliveryError);
-                }
+                send.onFailure(deliveryError);
             }
-
-            tagGenerator.returnTag(delivery.getTag());
-            toRemove.add(delivery);
-            delivery.settle();
         }
 
-        sent.removeAll(toRemove);
-
-        super.processDeliveryUpdates(provider);
+        super.processDeliveryUpdates(provider, delivery);
     }
 
     public AmqpSession getSession() {
@@ -312,45 +309,45 @@ public class AmqpFixedProducer extends AmqpProducer {
             error = new JMSException("Producer closed remotely before message transfer result was notified");
         }
 
-        for (Delivery delivery : sent) {
+        Collection<InFlightSend> inflightSends = new ArrayList<InFlightSend>(sent.values());
+        for (InFlightSend send : inflightSends) {
             try {
-                AsyncResult request = (AsyncResult) delivery.getContext();
-
-                if (request != null && !request.isComplete()) {
-                    request.onFailure(error);
-                }
-
-                delivery.settle();
-                tagGenerator.returnTag(delivery.getTag());
+                send.onFailure(error);
             } catch (Exception e) {
-                LOG.debug("Caught exception when failing pending send during remote producer closure: {}", delivery, e);
+                LOG.debug("Caught exception when failing pending send during remote producer closure: {}", send, e);
             }
         }
 
-        sent.clear();
-
-        for (InFlightSend blockedSend : blocked) {
+        Collection<InFlightSend> blockedSends = new ArrayList<InFlightSend>(blocked.values());
+        for (InFlightSend send : blockedSends) {
             try {
-                AsyncResult request = blockedSend.request;
-                if (request != null && !request.isComplete()) {
-                    request.onFailure(error);
-                }
+                send.onFailure(error);
             } catch (Exception e) {
-                LOG.debug("Caught exception when failing blocked send during remote producer closure: {}", blockedSend, e);
+                LOG.debug("Caught exception when failing blocked send during remote producer closure: {}", send, e);
             }
         }
+    }
 
-        blocked.clear();
+    @Override
+    public void addSendCompletionWatcher(AsyncResult watcher) {
+        // If none pending signal done already.
+        // TODO - If we don't include blocked sends then update this.
+        if (blocked.isEmpty() && sent.isEmpty()) {
+            watcher.onSuccess();
+        } else {
+            this.sendCompletionWatcher = watcher;
+        }
     }
 
     //----- Class used to manage held sends ----------------------------------//
 
-    private class InFlightSend implements AsyncResult {
+    private class InFlightSend implements AsyncResult, AmqpExceptionBuilder {
 
-        public final JmsOutboundMessageDispatch envelope;
-        public final AsyncResult request;
+        private final JmsOutboundMessageDispatch envelope;
+        private final AsyncResult request;
 
-        public ScheduledFuture<?> requestTimeout;
+        private Delivery delivery;
+        private ScheduledFuture<?> requestTimeout;
 
         public InFlightSend(JmsOutboundMessageDispatch envelope, AsyncResult request) {
             this.envelope = envelope;
@@ -359,31 +356,95 @@ public class AmqpFixedProducer extends AmqpProducer {
 
         @Override
         public void onFailure(Throwable cause) {
-            if (requestTimeout != null) {
-                requestTimeout.cancel(false);
-                requestTimeout = null;
-            }
-
-            blocked.remove(this);
+            handleSendCompletion(false);
 
-            request.onFailure(cause);
+            if (request.isComplete()) {
+                // Asynchronous sends can still be awaiting a completion in which case we
+                // send to them otherwise send to the listener to be reported.
+                if (envelope.isCompletionRequired()) {
+                    getParent().getProvider().getProviderListener().onFailedMessageSend(envelope, cause);
+                } else {
+                    getParent().getProvider().fireNonFatalProviderException(IOExceptionSupport.create(cause));
+                }
+            } else {
+                request.onFailure(cause);
+            }
         }
 
         @Override
         public void onSuccess() {
-            if (requestTimeout != null) {
-                requestTimeout.cancel(false);
-                requestTimeout = null;
+            handleSendCompletion(true);
+
+            if (!request.isComplete()) {
+                request.onSuccess();
             }
 
-            blocked.remove(this);
+            if (envelope.isCompletionRequired()) {
+                getParent().getProvider().getProviderListener().onCompletedMessageSend(envelope);
+            }
+        }
 
-            request.onSuccess();
+        public void setRequestTimeout(ScheduledFuture<?> requestTimeout) {
+            if (this.requestTimeout != null) {
+                this.requestTimeout.cancel(false);
+            }
+
+            this.requestTimeout = requestTimeout;
+        }
+
+        public JmsOutboundMessageDispatch getEnvelope() {
+            return envelope;
+        }
+
+        public AsyncResult getOriginalRequest() {
+            return request;
+        }
+
+        public void setDelivery(Delivery delivery) {
+            this.delivery = delivery;
+        }
+
+        public Delivery getDelivery() {
+            return delivery;
         }
 
         @Override
         public boolean isComplete() {
             return request.isComplete();
         }
+
+        private void handleSendCompletion(boolean successful) {
+            setRequestTimeout(null);
+
+            if (getDelivery() != null) {
+                sent.remove(envelope.getMessageId());
+                delivery.settle();
+                tagGenerator.returnTag(delivery.getTag());
+            } else {
+                blocked.remove(envelope.getMessageId());
+            }
+
+            // TODO - Should this take blocked sends into consideration.
+            // Signal the watcher that all pending sends have completed if one is registered
+            // and both the in-flight sends and blocked sends have completed.
+            if (sendCompletionWatcher != null && sent.isEmpty() && blocked.isEmpty()) {
+                sendCompletionWatcher.onSuccess();
+            }
+
+            // Once the pending sends queue is drained and all in-flight sends have been
+            // settled we can propagate the close request.
+            if (isAwaitingClose() && !isClosed() && blocked.isEmpty() && sent.isEmpty()) {
+                AmqpFixedProducer.super.close(closeRequest);
+            }
+        }
+
+        @Override
+        public Exception createException() {
+            if (delivery == null) {
+                return new JmsSendTimedOutException("Timed out waiting for credit to send Message", envelope.getMessage());
+            } else {
+                return new JmsSendTimedOutException("Timed out waiting for disposition of sent Message", envelope.getMessage());
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
index 74fe457..3ace6d2 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
@@ -54,13 +54,10 @@ public abstract class AmqpProducer extends AmqpAbstractResource<JmsProducerInfo,
      * @param request
      *        The AsyncRequest that will be notified on send success or failure.
      *
-     * @return true if the producer had credit to send or false if there was no available
-     *         credit and the send needed to be deferred.
-     *
      * @throws IOException if an error occurs sending the message
      * @throws JMSException if an error occurs while preparing the message for send.
      */
-    public abstract boolean send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException;
+    public abstract void send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException;
 
     /**
      * @return true if this is an anonymous producer or false if fixed to a given destination.
@@ -92,4 +89,14 @@ public abstract class AmqpProducer extends AmqpAbstractResource<JmsProducerInfo,
     public void setPresettle(boolean presettle) {
         this.presettle = presettle;
     }
+
+    /**
+     * Allows a completion request to be added to this producer that will be notified
+     * once all outstanding sends have completed.
+     *
+     * @param watcher
+     *      The AsyncResult that will be signaled once this producer has no pending sends.
+     */
+    public abstract void addSendCompletionWatcher(AsyncResult watcher);
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 3bfe099..ca8a0e1 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -59,6 +59,7 @@ import org.apache.qpid.jms.transports.TransportListener;
 import org.apache.qpid.jms.util.IOExceptionSupport;
 import org.apache.qpid.proton.engine.Collector;
 import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.EndpointState;
 import org.apache.qpid.proton.engine.Event;
 import org.apache.qpid.proton.engine.Event.Type;
@@ -477,11 +478,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
                         producer = session.getProducer(producerId);
                     }
 
-                    boolean couldSend = producer.send(envelope, request);
-                    pumpToProtonTransport(request);
-                    if (couldSend && envelope.isSendAsync()) {
-                        request.onSuccess();
-                    }
+                    producer.send(envelope, request);
                 } catch (Throwable t) {
                     request.onFailure(t);
                 }
@@ -816,7 +813,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
                     case DELIVERY:
                         amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
                         if (amqpEventSink != null) {
-                            amqpEventSink.processDeliveryUpdates(this);
+                            amqpEventSink.processDeliveryUpdates(this, (Delivery) protonEvent.getContext());
                         }
                         break;
                     default:
@@ -1175,6 +1172,36 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
         return null;
     }
 
+    /**
+     * Allows a resource to request that its parent resource schedule a future
+     * cancellation of a request and return it a {@link Future} instance that
+     * can be used to cancel the scheduled automatic failure of the request.
+     *
+     * @param request
+     *      The request that should be marked as failed based on configuration.
+     * @param timeout
+     *      The time to wait before marking the request as failed.
+     * @param builder
+     *      An AmqpExceptionBuilder to use when creating a timed out exception.
+     *
+     * @return a {@link ScheduledFuture} that can be stored by the caller.
+     */
+    public ScheduledFuture<?> scheduleRequestTimeout(final AsyncResult request, long timeout, final AmqpExceptionBuilder builder) {
+        if (timeout != JmsConnectionInfo.INFINITE) {
+            return serializer.schedule(new Runnable() {
+
+                @Override
+                public void run() {
+                    request.onFailure(builder.createException());
+                    pumpToProtonTransport();
+                }
+
+            }, timeout, TimeUnit.MILLISECONDS);
+        }
+
+        return null;
+    }
+
     //----- Internal implementation ------------------------------------------//
 
     private void checkClosed() throws ProviderClosedException {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
index 577c128..2d08bc7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
@@ -44,6 +44,7 @@ public class AmqpTransactionContext implements AmqpResourceParent {
 
     private final AmqpSession session;
     private final Set<AmqpConsumer> txConsumers = new LinkedHashSet<AmqpConsumer>();
+    private final Set<AmqpProducer> txProducers = new LinkedHashSet<AmqpProducer>();
 
     private JmsTransactionId current;
     private AmqpTransactionCoordinator coordinator;
@@ -85,7 +86,6 @@ public class AmqpTransactionContext implements AmqpResourceParent {
             }
         };
 
-
         if (coordinator == null || coordinator.isClosed()) {
             AmqpTransactionCoordinatorBuilder builder =
                 new AmqpTransactionCoordinatorBuilder(this, session.getResourceInfo());
@@ -115,7 +115,7 @@ public class AmqpTransactionContext implements AmqpResourceParent {
         }
     }
 
-    public void commit(JmsTransactionInfo transactionInfo, final AsyncResult request) throws Exception {
+    public void commit(final JmsTransactionInfo transactionInfo, final AsyncResult request) throws Exception {
         if (!transactionInfo.getId().equals(current)) {
             if (!transactionInfo.isInDoubt() && current == null) {
                 throw new IllegalStateException("Commit called with no active Transaction.");
@@ -128,29 +128,10 @@ public class AmqpTransactionContext implements AmqpResourceParent {
 
         preCommit();
 
-        LOG.trace("TX Context[{}] committing current TX[[]]", this, current);
-        coordinator.discharge(current, new AsyncResult() {
-
-            @Override
-            public void onSuccess() {
-                current = null;
-                postCommit();
-                request.onSuccess();
-            }
-
-            @Override
-            public void onFailure(Throwable result) {
-                current = null;
-                postCommit();
-                request.onFailure(result);
-            }
-
-            @Override
-            public boolean isComplete() {
-                return current == null;
-            }
+        DischargeCompletion dischargeResult = new DischargeCompletion(request, true);
 
-        }, true);
+        LOG.trace("TX Context[{}] committing current TX[[]]", this, current);
+        coordinator.discharge(current, dischargeResult, true);
     }
 
     public void rollback(JmsTransactionInfo transactionInfo, final AsyncResult request) throws Exception {
@@ -167,29 +148,17 @@ public class AmqpTransactionContext implements AmqpResourceParent {
 
         preRollback();
 
-        LOG.trace("TX Context[{}] rolling back current TX[[]]", this, current);
-        coordinator.discharge(current, new AsyncResult() {
-
-            @Override
-            public void onSuccess() {
-                current = null;
-                postRollback();
-                request.onSuccess();
-            }
-
-            @Override
-            public void onFailure(Throwable result) {
-                current = null;
-                postRollback();
-                request.onFailure(result);
-            }
+        DischargeCompletion dischargeResult = new DischargeCompletion(request, false);
 
-            @Override
-            public boolean isComplete() {
-                return current == null;
+        if (txProducers.isEmpty()) {
+            LOG.trace("TX Context[{}] rolling back current TX[[]]", this, current);
+            coordinator.discharge(current, dischargeResult, false);
+        } else {
+            SendCompletion producersSendCompletion = new SendCompletion(transactionInfo, dischargeResult, txProducers.size(), false);
+            for (AmqpProducer producer : txProducers) {
+                producer.addSendCompletionWatcher(producersSendCompletion);
             }
-
-        }, false);
+        }
     }
 
     //----- Context utility methods ------------------------------------------//
@@ -198,6 +167,10 @@ public class AmqpTransactionContext implements AmqpResourceParent {
         txConsumers.add(consumer);
     }
 
+    public void registerTxProducer(AmqpProducer producer) {
+        txProducers.add(producer);
+    }
+
     public AmqpSession getSession() {
         return session;
     }
@@ -243,6 +216,7 @@ public class AmqpTransactionContext implements AmqpResourceParent {
         }
 
         txConsumers.clear();
+        txProducers.clear();
     }
 
     private void postRollback() {
@@ -251,6 +225,7 @@ public class AmqpTransactionContext implements AmqpResourceParent {
         }
 
         txConsumers.clear();
+        txProducers.clear();
     }
 
     //----- Resource Parent implementation -----------------------------------//
@@ -273,4 +248,94 @@ public class AmqpTransactionContext implements AmqpResourceParent {
     public AmqpProvider getProvider() {
         return session.getProvider();
     }
+
+    //----- Completion for Commit or Rollback operation ----------------------//
+
+    private class DischargeCompletion implements AsyncResult {
+
+        private final AsyncResult request;
+        private final boolean commit;
+
+        public DischargeCompletion(AsyncResult request, boolean commit) {
+            this.request = request;
+            this.commit = commit;
+        }
+
+        @Override
+        public void onFailure(Throwable result) {
+            cleanup();
+            request.onFailure(result);
+        }
+
+        @Override
+        public void onSuccess() {
+            cleanup();
+            request.onSuccess();
+        }
+
+        @Override
+        public boolean isComplete() {
+            return request.isComplete();
+        }
+
+        private void cleanup() {
+            current = null;
+            if (commit) {
+                postCommit();
+            } else {
+                postRollback();
+            }
+        }
+    }
+
+    //----- Completion result for Producers ----------------------------------//
+
+    @SuppressWarnings("unused")
+    private class SendCompletion implements AsyncResult {
+
+        private int pendingCompletions;
+
+        private final JmsTransactionInfo info;
+        private final DischargeCompletion request;
+
+        private boolean commit;
+
+        public SendCompletion(JmsTransactionInfo info, DischargeCompletion request, int pendingCompletions, boolean commit) {
+            this.info = info;
+            this.request = request;
+            this.pendingCompletions = pendingCompletions;
+            this.commit = commit;
+        }
+
+        @Override
+        public void onFailure(Throwable result) {
+            if (--pendingCompletions == 0) {
+                try {
+                    LOG.trace("TX Context[{}] rolling back current TX[[]]", this, current);
+                    coordinator.discharge(current, request, false);
+                } catch (Throwable error) {
+                    request.onFailure(error);
+                }
+            } else {
+                commit = false;
+            }
+        }
+
+        @Override
+        public void onSuccess() {
+            if (--pendingCompletions == 0) {
+                try {
+                    LOG.trace("TX Context[{}] {} current TX[[]]", this, commit ? "committing" : "rolling back" ,current);
+                    coordinator.discharge(current, request, commit);
+                } catch (Throwable error) {
+                    request.onFailure(error);
+                }
+            }
+        }
+
+        @Override
+        public boolean isComplete() {
+            return request.isComplete();
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org