You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/09/12 20:12:03 UTC

[2/7] qpid-jms git commit: QPIDJMS-207 Adds support for Asynchronous JMS 2.0 sends.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
index 2fa2644..d18e95b 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
@@ -67,7 +67,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<JmsSessionI
     }
 
     @Override
-    public void processDeliveryUpdates(AmqpProvider provider) throws IOException {
+    public void processDeliveryUpdates(AmqpProvider provider, Delivery delivery) throws IOException {
         try {
             if (pendingDelivery != null && pendingDelivery.remotelySettled()) {
                 DeliveryState state = pendingDelivery.getRemoteState();
@@ -105,7 +105,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<JmsSessionI
                 }
             }
 
-            super.processDeliveryUpdates(provider);
+            super.processDeliveryUpdates(provider, delivery);
         } catch (Exception e) {
             throw IOExceptionSupport.create(e);
         }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
index 229e61f..5912f7f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
@@ -24,10 +24,12 @@ import org.apache.qpid.jms.meta.JmsConnectionInfo;
 import org.apache.qpid.jms.meta.JmsResource;
 import org.apache.qpid.jms.provider.AsyncResult;
 import org.apache.qpid.jms.provider.amqp.AmqpEventSink;
+import org.apache.qpid.jms.provider.amqp.AmqpExceptionBuilder;
 import org.apache.qpid.jms.provider.amqp.AmqpProvider;
 import org.apache.qpid.jms.provider.amqp.AmqpResource;
 import org.apache.qpid.jms.provider.amqp.AmqpResourceParent;
 import org.apache.qpid.jms.provider.amqp.AmqpSupport;
+import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Endpoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,7 +42,7 @@ import org.slf4j.LoggerFactory;
  * @param <INFO> The Type of JmsResource used to describe the target resource.
  * @param <ENDPOINT> The AMQP Endpoint that the target resource encapsulates.
  */
-public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT extends AmqpResourceParent, INFO extends JmsResource, ENDPOINT extends Endpoint> implements AmqpEventSink {
+public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT extends AmqpResourceParent, INFO extends JmsResource, ENDPOINT extends Endpoint> implements AmqpEventSink, AmqpExceptionBuilder {
 
     private static final Logger LOG = LoggerFactory.getLogger(AmqpResourceBuilder.class);
 
@@ -97,7 +99,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
                     return request.isComplete();
                 }
 
-            }, getRequestTimeout(), new JmsOperationTimedOutException("Request to open resource " + getResource() + " timed out"));
+            }, getRequestTimeout(), this);
         }
     }
 
@@ -119,7 +121,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
     }
 
     @Override
-    public void processDeliveryUpdates(AmqpProvider provider) throws IOException {
+    public void processDeliveryUpdates(AmqpProvider provider, Delivery delivery) throws IOException {
         // No implementation needed here for this event.
     }
 
@@ -185,6 +187,11 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
         getRequest().onFailure(openError);
     }
 
+    @Override
+    public Exception createException() {
+        return new JmsOperationTimedOutException("Request to open resource " + getResource() + " timed out");
+    }
+
     //----- Implementation methods used to customize the build process -------//
 
     /**

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
index e6e36df..2490b19 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
@@ -801,6 +801,24 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
     }
 
     @Override
+    public void onCompletedMessageSend(final JmsOutboundMessageDispatch envelope) {
+        if (closingConnection.get() || closed.get() || failed.get()) {
+            return;
+        }
+
+        listener.onCompletedMessageSend(envelope);
+    }
+
+    @Override
+    public void onFailedMessageSend(final JmsOutboundMessageDispatch envelope, Throwable cause) {
+        if (closingConnection.get() || closed.get() || failed.get()) {
+            return;
+        }
+
+        listener.onFailedMessageSend(envelope, cause);
+    }
+
+    @Override
     public void onConnectionFailure(final IOException ex) {
         if (closingConnection.get() || closed.get() || failed.get()) {
             return;

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index c8914ae..a8fcf2d 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -29,6 +29,7 @@ import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.jms.Connection;
 import javax.jms.IllegalStateException;
@@ -932,4 +933,112 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(3000);
         }
     }
+
+    @Test(timeout=20000)
+    public void testMessageListenerCallsConnectionCloseThrowsIllegalStateException() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicReference<Exception> asyncError = new AtomicReference<Exception>(null);
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            final Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue destination = session.createQueue(getTestName());
+            connection.start();
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 1);
+
+            MessageConsumer consumer = session.createConsumer(destination);
+
+            testPeer.expectDisposition(true, new AcceptedMatcher());
+
+            consumer.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message m) {
+                    try {
+                        LOG.debug("Async consumer got Message: {}", m);
+                        connection.close();
+                    } catch (Exception ex) {
+                        asyncError.set(ex);
+                    }
+
+                    latch.countDown();
+                }
+            });
+
+            boolean await = latch.await(3000, TimeUnit.MILLISECONDS);
+            assertTrue("Messages not received within given timeout. Count remaining: " + latch.getCount(), await);
+
+            assertNotNull(asyncError.get());
+            assertTrue(asyncError.get() instanceof IllegalStateException);
+
+            testPeer.waitForAllHandlersToComplete(2000);
+
+            testPeer.expectDetach(true, true, true);
+            consumer.close();
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
+
+    @Test(timeout=20000)
+    public void testMessageListenerCallsSessionCloseThrowsIllegalStateException() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicReference<Exception> asyncError = new AtomicReference<Exception>(null);
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue destination = session.createQueue(getTestName());
+            connection.start();
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 1);
+
+            MessageConsumer consumer = session.createConsumer(destination);
+
+            testPeer.expectDisposition(true, new AcceptedMatcher());
+
+            consumer.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message m) {
+                    try {
+                        LOG.debug("Async consumer got Message: {}", m);
+                        session.close();
+                    } catch (Exception ex) {
+                        asyncError.set(ex);
+                    }
+
+                    latch.countDown();
+                }
+            });
+
+            boolean await = latch.await(3000, TimeUnit.MILLISECONDS);
+            assertTrue("Messages not received within given timeout. Count remaining: " + latch.getCount(), await);
+
+            assertNotNull(asyncError.get());
+            assertTrue(asyncError.get() instanceof IllegalStateException);
+
+            testPeer.waitForAllHandlersToComplete(2000);
+
+            testPeer.expectDetach(true, true, true);
+            consumer.close();
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledProducerIntegrationTest.java
index 5485857..2ed9f41 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledProducerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledProducerIntegrationTest.java
@@ -20,8 +20,15 @@ import static org.apache.qpid.jms.provider.amqp.AmqpSupport.ANONYMOUS_RELAY;
 import static org.hamcrest.Matchers.arrayContaining;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.Message;
@@ -30,8 +37,11 @@ import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TemporaryQueue;
 import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
 import javax.jms.Topic;
 
+import org.apache.qpid.jms.JmsCompletionListener;
+import org.apache.qpid.jms.JmsMessageProducer;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.testpeer.ListDescribedType;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
@@ -47,12 +57,16 @@ import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.transaction.TxnCapability;
 import org.hamcrest.Matcher;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Test MessageProducers created using various configuration of the presettle options
  */
 public class PresettledProducerIntegrationTest extends QpidJmsTestCase {
 
+    private static final Logger LOG = LoggerFactory.getLogger(PresettledProducerIntegrationTest.class);
+
     private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
 
     private final Symbol[] serverCapabilities = new Symbol[] { ANONYMOUS_RELAY };
@@ -419,4 +433,221 @@ public class PresettledProducerIntegrationTest extends QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
+
+    //----- Test the jms.presettleAll with asynchronous completion -----------//
+
+    @Test(timeout = 20000)
+    public void testAsyncCompletionPresettleAllSendToTopic() throws Exception {
+        String presettleConfig = "?jms.presettlePolicy.presettleAll=true";
+        doTestAsyncCompletionProducerWithPresettleOptions(presettleConfig, false, false, true, true, Topic.class);
+    }
+
+    @Test(timeout = 20000)
+    public void testAsyncCompletionPresettleAllSendToQueue() throws Exception {
+        String presettleConfig = "?jms.presettlePolicy.presettleAll=true";
+        doTestAsyncCompletionProducerWithPresettleOptions(presettleConfig, false, false, true, true, Queue.class);
+    }
+
+    @Test(timeout = 20000)
+    public void testsyncCompletionPresettleAllAnonymousSendToTopic() throws Exception {
+        String presettleConfig = "?jms.presettlePolicy.presettleAll=true";
+        doTestAsyncCompletionProducerWithPresettleOptions(presettleConfig, false, true, true, true, Topic.class);
+    }
+
+    @Test(timeout = 20000)
+    public void testsyncCompletionPresettleAllAnonymousSendToQueue() throws Exception {
+        String presettleConfig = "?jms.presettlePolicy.presettleAll=true";
+        doTestAsyncCompletionProducerWithPresettleOptions(presettleConfig, false, true, true, true, Queue.class);
+    }
+
+    //----- Test the jms.presettleProducers with asynchronous completion -----//
+
+    @Test(timeout = 20000)
+    public void testAsyncCompletionPresettleProducersTopic() throws Exception {
+        String presettleConfig = "?jms.presettlePolicy.presettleProducers=true";
+        doTestAsyncCompletionProducerWithPresettleOptions(presettleConfig, false, false, true, true, Topic.class);
+    }
+
+    @Test(timeout = 20000)
+    public void testAsyncCompletionPresettleProducersQueue() throws Exception {
+        String presettleConfig = "?jms.presettlePolicy.presettleProducers=true";
+        doTestAsyncCompletionProducerWithPresettleOptions(presettleConfig, false, false, true, true, Queue.class);
+    }
+
+    @Test(timeout = 20000)
+    public void testAsyncCompletionPresettleProducersAnonymousTopic() throws Exception {
+        String presettleConfig = "?jms.presettlePolicy.presettleProducers=true";
+        doTestAsyncCompletionProducerWithPresettleOptions(presettleConfig, false, true, true, true, Topic.class);
+    }
+
+    @Test(timeout = 20000)
+    public void testAsyncCompletionPresettleProducersAnonymousQueue() throws Exception {
+        String presettleConfig = "?jms.presettlePolicy.presettleProducers=true";
+        doTestAsyncCompletionProducerWithPresettleOptions(presettleConfig, false, true, true, true, Queue.class);
+    }
+
+    //----- Asynchronous Completion test method implementation ---------------//
+
+    private void doTestAsyncCompletionProducerWithPresettleOptions(String uriOptions, boolean transacted, boolean anonymous, boolean senderSettled, boolean transferSettled, Class<? extends Destination> destType) throws Exception {
+        doTestAsyncCompletionProducerWithPresettleOptions(uriOptions, transacted, anonymous, true, senderSettled, transferSettled, destType);
+    }
+
+    private void doTestAsyncCompletionProducerWithPresettleOptions(String uriOptions, boolean transacted, boolean anonymous, boolean relaySupported, boolean senderSettled, boolean transferSettled, Class<? extends Destination> destType) throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, uriOptions, relaySupported ? serverCapabilities : null, null);
+            testPeer.expectBegin();
+
+            Session session = null;
+            Binary txnId = null;
+
+            if (transacted) {
+                // Expect the session, with an immediate link to the transaction coordinator
+                // using a target with the expected capabilities only.
+                CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
+                txCoordinatorMatcher.withCapabilities(arrayContaining(TxnCapability.LOCAL_TXN));
+                testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+
+                // First expect an unsettled 'declare' transfer to the txn coordinator, and
+                // reply with a declared disposition state containing the txnId.
+                txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+                testPeer.expectDeclare(txnId);
+
+                session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            } else {
+                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            }
+
+            Destination destination = null;
+            if (destType == Queue.class) {
+                destination = session.createQueue("MyQueue");
+            } else if (destType == Topic.class) {
+                destination = session.createTopic("MyTopis");
+            } else if (destType == TemporaryQueue.class) {
+                String dynamicAddress = "myTempQueueAddress";
+                testPeer.expectTempQueueCreationAttach(dynamicAddress);
+                destination = session.createTemporaryQueue();
+            } else if (destType == TemporaryTopic.class) {
+                String dynamicAddress = "myTempTopicAddress";
+                testPeer.expectTempTopicCreationAttach(dynamicAddress);
+                destination = session.createTemporaryTopic();
+            } else {
+                fail("unexpected type");
+            }
+
+            if (senderSettled) {
+                testPeer.expectSettledSenderAttach();
+            } else {
+                testPeer.expectSenderAttach();
+            }
+
+            TestJmsCompletionListener listener = new TestJmsCompletionListener();
+            // TODO Can change to plain MessageProducer when JMS 2.0 API dependency is added.
+            JmsMessageProducer producer = null;
+            if (anonymous) {
+                producer = (JmsMessageProducer) session.createProducer(null);
+            } else {
+                producer = (JmsMessageProducer) session.createProducer(destination);
+            }
+
+            // Create and transfer a new message
+            MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
+            headersMatcher.withDurable(equalTo(true));
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+
+            Matcher<?> stateMatcher = nullValue();
+            if (transacted) {
+                stateMatcher = new TransactionalStateMatcher();
+                ((TransactionalStateMatcher) stateMatcher).withTxnId(equalTo(txnId));
+                ((TransactionalStateMatcher) stateMatcher).withOutcome(nullValue());
+            }
+
+            ListDescribedType responseState = new Accepted();
+            if (transacted) {
+                TransactionalState txState = new TransactionalState();
+                txState.setTxnId(txnId);
+                txState.setOutcome(new Accepted());
+            }
+
+            if (transferSettled) {
+                testPeer.expectTransfer(messageMatcher, stateMatcher, true, false, responseState, false);
+            } else {
+                testPeer.expectTransfer(messageMatcher, stateMatcher, false, true, responseState, true);
+            }
+
+            if (anonymous && !relaySupported) {
+                testPeer.expectDetach(true, true, true);
+            }
+
+            Message message = session.createTextMessage();
+
+            if (anonymous) {
+                producer.send(destination, message, listener);
+            } else {
+                producer.send(message, listener);
+            }
+
+            if (transacted) {
+                testPeer.expectDischarge(txnId, true);
+            }
+
+            testPeer.expectClose();
+
+            assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS));
+            assertNull(listener.exception);
+            assertNotNull(listener.message);
+            assertTrue(listener.message instanceof TextMessage);
+            assertEquals(1, listener.successCount);
+            assertEquals(0, listener.errorCount);
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    private class TestJmsCompletionListener implements JmsCompletionListener {
+
+        private final CountDownLatch completed;
+
+        public volatile int successCount;
+        public volatile int errorCount;
+
+        public volatile Message message;
+        public volatile Exception exception;
+
+        public TestJmsCompletionListener() {
+            this(1);
+        }
+
+        public TestJmsCompletionListener(int expected) {
+            this.completed = new CountDownLatch(expected);
+        }
+
+        public boolean awaitCompletion(long timeout, TimeUnit units) throws InterruptedException {
+            return completed.await(timeout, units);
+        }
+
+        @Override
+        public void onCompletion(Message message) {
+            LOG.info("JmsCompletionListener onCompletion called with message: {}", message);
+            this.message = message;
+            this.successCount++;
+
+            completed.countDown();
+        }
+
+        @Override
+        public void onException(Message message, Exception exception) {
+            LOG.info("JmsCompletionListener onException called with message: {} error {}", message, exception);
+
+            this.message = message;
+            this.exception = exception;
+            this.errorCount++;
+
+            completed.countDown();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index 6c53398..1e69eb8 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -40,7 +40,10 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
+import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.ExceptionListener;
@@ -54,9 +57,11 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 
+import org.apache.qpid.jms.JmsCompletionListener;
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.JmsConnectionFactory;
 import org.apache.qpid.jms.JmsDefaultConnectionListener;
+import org.apache.qpid.jms.JmsMessageProducer;
 import org.apache.qpid.jms.JmsOperationTimedOutException;
 import org.apache.qpid.jms.JmsSendTimedOutException;
 import org.apache.qpid.jms.message.foreign.ForeignJmsMessage;
@@ -68,10 +73,13 @@ import org.apache.qpid.jms.test.testpeer.ListDescribedType;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
 import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
 import org.apache.qpid.jms.test.testpeer.basictypes.TerminusDurability;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Modified;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Released;
+import org.apache.qpid.jms.test.testpeer.describedtypes.TransactionalState;
 import org.apache.qpid.jms.test.testpeer.matchers.TargetMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.TransactionalStateMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSectionMatcher;
@@ -1000,6 +1008,79 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
     }
 
     @Test(timeout = 20000)
+    public void testRemotelyEndProducerCompletesAsyncSends() throws Exception {
+        final String BREAD_CRUMB = "ErrorMessage";
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            final AtomicBoolean producerClosed = new AtomicBoolean();
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onProducerClosed(MessageProducer producer, Exception exception) {
+                    producerClosed.set(true);
+                }
+            });
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            // Create a producer, then remotely end the session afterwards.
+            testPeer.expectSenderAttach();
+
+            Queue queue = session.createQueue("myQueue");
+            // TODO - Can revert to just MessageProducer once JMS 2.0 API is used
+            final JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue);
+
+            Message message = session.createTextMessage("content");
+
+            final int MSG_COUNT = 3;
+
+            for (int i = 0; i < MSG_COUNT; ++i) {
+                testPeer.expectTransferButDoNotRespond(new TransferPayloadCompositeMatcher());
+            }
+
+            testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true, AmqpError.RESOURCE_LIMIT_EXCEEDED, BREAD_CRUMB, 50);
+
+            TestJmsCompletionListener listener = new TestJmsCompletionListener(MSG_COUNT);
+            try {
+                for (int i = 0; i < MSG_COUNT; ++i) {
+                    producer.send(message, listener);
+                }
+            } catch (JMSException e) {
+                LOG.warn("Caught unexpected error: {}", e.getMessage());
+                fail("No expected exception for this send.");
+            }
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            // Verify the producer gets marked closed
+            assertTrue(listener.awaitCompletion(2000, TimeUnit.SECONDS));
+            assertEquals(MSG_COUNT, listener.errorCount);
+
+            // Verify the session is now marked closed
+            try {
+                producer.getDeliveryMode();
+                fail("Expected ISE to be thrown due to being closed");
+            } catch (IllegalStateException jmsise) {
+                String errorMessage = jmsise.getCause().getMessage();
+                assertTrue(errorMessage.contains(AmqpError.RESOURCE_LIMIT_EXCEEDED.toString()));
+                assertTrue(errorMessage.contains(BREAD_CRUMB));
+            }
+
+            assertTrue("Producer closed callback didn't trigger", producerClosed.get());
+
+            // Try closing it explicitly, should effectively no-op in client.
+            // The test peer will throw during close if it sends anything.
+            producer.close();
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
     public void testRemotelyCloseConnectionDuringSyncSend() throws Exception {
         final String BREAD_CRUMB = "ErrorMessageBreadCrumb";
 
@@ -1150,6 +1231,50 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
     }
 
     @Test(timeout = 20000)
+    public void testAsyncCompletionGetsTimedOutErrorWhenNoDispostionArrives() throws Exception {
+        try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            connection.setSendTimeout(500);
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            String queueName = "myQueue";
+            Queue queue = session.createQueue(queueName);
+
+            Message message = session.createTextMessage("text");
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+
+            // Expect the producer to attach and grant it some credit, it should send
+            // a transfer which we will not send any response for which should cause the
+            // send operation to time out.
+            testPeer.expectSenderAttach();
+            testPeer.expectTransferButDoNotRespond(messageMatcher);
+            testPeer.expectClose();
+
+            // TODO - Can revert to plain JMS once 2.0 is supported.
+            JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue);
+            TestJmsCompletionListener listener = new TestJmsCompletionListener();
+
+            try {
+                producer.send(message, listener);
+            } catch (Throwable error) {
+                LOG.info("Caught expected error: {}", error.getMessage());
+                fail("Send should not fail for async.");
+            }
+
+            assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS));
+            assertNotNull(listener.exception);
+            assertTrue(listener.exception instanceof JmsSendTimedOutException);
+            assertNotNull(listener.message);
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
     public void testSyncSendMessageRejected() throws Exception {
         doSyncSendMessageNotAcceptedTestImpl(new Rejected());
     }
@@ -1709,6 +1834,430 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
     }
 
     @Test(timeout = 20000)
+    public void testAsyncCompletionAfterSendMessageGetDispoation() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            // TODO Can change to plain MessageProducer when JMS 2.0 API dependency is added.
+            JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue);
+
+            // Create and transfer a new message
+            String text = "myMessage";
+            testPeer.expectTransfer(new TransferPayloadCompositeMatcher());
+            testPeer.expectClose();
+
+            TextMessage message = session.createTextMessage(text);
+            TestJmsCompletionListener listener = new TestJmsCompletionListener();
+
+            producer.send(message, listener);
+
+            assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS));
+            assertNull(listener.exception);
+            assertNotNull(listener.message);
+            assertTrue(listener.message instanceof TextMessage);
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testAsyncCompletionResetsBytesMessage() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            // TODO Can change to plain MessageProducer when JMS 2.0 API dependency is added.
+            JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue);
+
+            // Create and transfer a new message
+            testPeer.expectTransfer(new TransferPayloadCompositeMatcher());
+            testPeer.expectClose();
+
+            Binary payload = new Binary(new byte[] {1, 2, 3, 4});
+            BytesMessage message = session.createBytesMessage();
+            message.writeBytes(payload.getArray());
+
+            TestJmsCompletionListener listener = new TestJmsCompletionListener();
+
+            producer.send(message, listener);
+
+            assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS));
+            assertNull(listener.exception);
+            assertNotNull(listener.message);
+            assertTrue(listener.message instanceof BytesMessage);
+
+            BytesMessage completed = (BytesMessage) listener.message;
+            assertEquals(payload.getLength(), completed.getBodyLength());
+            byte[] data = new byte[payload.getLength()];
+            completed.readBytes(data);
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testAsyncCompletionSendMessageRejected() throws Exception {
+        doAsyncCompletionSendMessageNotAcceptedTestImpl(new Rejected());
+    }
+
+    @Test(timeout = 20000)
+    public void testAsyncCompletionSendMessageReleased() throws Exception {
+        doAsyncCompletionSendMessageNotAcceptedTestImpl(new Released());
+    }
+
+    @Test(timeout = 20000)
+    public void testAsyncCompletionSendMessageModifiedDeliveryFailed() throws Exception {
+        Modified modified = new Modified();
+        modified.setDeliveryFailed(true);
+
+        doAsyncCompletionSendMessageNotAcceptedTestImpl(modified);
+    }
+
+    @Test(timeout = 20000)
+    public void testAsyncCompletionSendMessageModifiedUndeliverable() throws Exception {
+        Modified modified = new Modified();
+        modified.setUndeliverableHere(true);
+
+        doAsyncCompletionSendMessageNotAcceptedTestImpl(modified);
+    }
+
+    @Test(timeout = 20000)
+    public void testAsyncCompletionSendMessageModifiedDeliveryFailedUndeliverable() throws Exception {
+        Modified modified = new Modified();
+        modified.setDeliveryFailed(true);
+        modified.setUndeliverableHere(true);
+
+        doAsyncCompletionSendMessageNotAcceptedTestImpl(modified);
+    }
+
+    private void doAsyncCompletionSendMessageNotAcceptedTestImpl(ListDescribedType responseState) throws JMSException, InterruptedException, Exception, IOException {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+
+            final CountDownLatch asyncError = new CountDownLatch(1);
+
+            connection.setExceptionListener(new ExceptionListener() {
+
+                @Override
+                public void onException(JMSException exception) {
+                    LOG.debug("ExceptionListener got error: {}", exception.getMessage());
+                    asyncError.countDown();
+                }
+            });
+
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+            testPeer.expectSenderAttach();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            // TODO Can change to plain MessageProducer when JMS 2.0 API dependency is added.
+            JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue);
+
+            // Create a second producer which allows for a safe wait for credit for the
+            // first producer without the need for a sleep.  Otherwise the first producer
+            // might not do an actual async send due to not having received credit yet.
+            session.createProducer(queue);
+
+            Message message = session.createTextMessage("content");
+
+            testPeer.expectTransfer(new TransferPayloadCompositeMatcher(), nullValue(), false, responseState, true);
+
+            assertNull("Should not yet have a JMSDestination", message.getJMSDestination());
+
+            TestJmsCompletionListener listener = new TestJmsCompletionListener();
+            try {
+                producer.send(message, listener);
+            } catch (JMSException e) {
+                LOG.warn("Caught unexpected error: {}", e.getMessage());
+                fail("No expected exception for this send.");
+            }
+
+            assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS));
+            assertNotNull(listener.exception);
+            assertNotNull(listener.message);
+            assertTrue(listener.message instanceof TextMessage);
+
+            testPeer.expectTransfer(new TransferPayloadCompositeMatcher());
+            testPeer.expectClose();
+
+            listener = new TestJmsCompletionListener();
+            try {
+                producer.send(message, listener);
+            } catch (JMSException e) {
+                LOG.warn("Caught unexpected error: {}", e.getMessage());
+                fail("No expected exception for this send.");
+            }
+
+            assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS));
+            assertNull(listener.exception);
+            assertNotNull(listener.message);
+            assertTrue(listener.message instanceof TextMessage);
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testAsyncCompletionSessionCloseThrowsIllegalStateException() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            // TODO Can change to plain MessageProducer when JMS 2.0 API dependency is added.
+            JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue);
+
+            // Create and transfer a new message
+            String text = "myMessage";
+            testPeer.expectTransfer(new TransferPayloadCompositeMatcher());
+            testPeer.expectClose();
+
+            final AtomicReference<JMSException> closeError = new AtomicReference<JMSException>(null);
+            TextMessage message = session.createTextMessage(text);
+            TestJmsCompletionListener listener = new TestJmsCompletionListener() {
+
+                @Override
+                public void onCompletion(Message message) {
+
+                    try {
+                        session.close();
+                    } catch (JMSException jmsEx) {
+                        closeError.set(jmsEx);
+                    }
+
+                    super.onCompletion(message);
+                };
+            };
+
+            producer.send(message, listener);
+
+            assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS));
+            assertNull(listener.exception);
+            assertNotNull(listener.message);
+            assertTrue(listener.message instanceof TextMessage);
+            assertNotNull(closeError.get());
+            assertTrue(closeError.get() instanceof IllegalStateException);
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testAsyncCompletionConnectionCloseThrowsIllegalStateException() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            final Connection connection = testFixture.establishConnecton(testPeer);
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            // TODO Can change to plain MessageProducer when JMS 2.0 API dependency is added.
+            JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue);
+
+            // Create and transfer a new message
+            String text = "myMessage";
+            testPeer.expectTransfer(new TransferPayloadCompositeMatcher());
+            testPeer.expectClose();
+
+            final AtomicReference<JMSException> closeError = new AtomicReference<JMSException>(null);
+            TextMessage message = session.createTextMessage(text);
+            TestJmsCompletionListener listener = new TestJmsCompletionListener() {
+
+                @Override
+                public void onCompletion(Message message) {
+
+                    try {
+                        connection.close();
+                    } catch (JMSException jmsEx) {
+                        closeError.set(jmsEx);
+                    }
+
+                    super.onCompletion(message);
+                };
+            };
+
+            producer.send(message, listener);
+
+            assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS));
+            assertNull(listener.exception);
+            assertNotNull(listener.message);
+            assertNotNull(closeError.get());
+            assertTrue(closeError.get() instanceof IllegalStateException);
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testAsyncCompletionSessionCommitThrowsIllegalStateException() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            final Connection connection = testFixture.establishConnecton(testPeer);
+            testPeer.expectBegin();
+            testPeer.expectCoordinatorAttach();
+
+            // First expect an unsettled 'declare' transfer to the txn coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+            testPeer.expectDeclare(txnId);
+
+            testPeer.expectSenderAttach();
+
+            final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue("myQueue");
+
+            // TODO Can change to plain MessageProducer when JMS 2.0 API dependency is added.
+            JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue);
+
+            // Create and transfer a new message
+            String text = "myMessage";
+            MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+            MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+            messageMatcher.setPropertiesMatcher(propsMatcher);
+            messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
+            TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
+            stateMatcher.withTxnId(equalTo(txnId));
+            stateMatcher.withOutcome(nullValue());
+            TransactionalState txState = new TransactionalState();
+            txState.setTxnId(txnId);
+            txState.setOutcome(new Accepted());
+
+            testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true);
+            testPeer.expectDischarge(txnId, true);
+            testPeer.expectClose();
+
+            final AtomicReference<JMSException> commitError = new AtomicReference<JMSException>(null);
+            TextMessage message = session.createTextMessage(text);
+            TestJmsCompletionListener listener = new TestJmsCompletionListener() {
+
+                @Override
+                public void onCompletion(Message message) {
+
+                    try {
+                        session.commit();
+                    } catch (JMSException jmsEx) {
+                        commitError.set(jmsEx);
+                    }
+
+                    super.onCompletion(message);
+                };
+            };
+
+            producer.send(message, listener);
+
+            assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS));
+            assertNull(listener.exception);
+            assertNotNull(listener.message);
+            assertNotNull(commitError.get());
+            assertTrue(commitError.get() instanceof IllegalStateException);
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testAsyncCompletionSessionRollbackThrowsIllegalStateException() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            final Connection connection = testFixture.establishConnecton(testPeer);
+            testPeer.expectBegin();
+            testPeer.expectCoordinatorAttach();
+
+            // First expect an unsettled 'declare' transfer to the txn coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+            testPeer.expectDeclare(txnId);
+
+            testPeer.expectSenderAttach();
+
+            final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue("myQueue");
+
+            // TODO Can change to plain MessageProducer when JMS 2.0 API dependency is added.
+            JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue);
+
+            // Create and transfer a new message
+            String text = "myMessage";
+            MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+            MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+            messageMatcher.setPropertiesMatcher(propsMatcher);
+            messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
+            TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
+            stateMatcher.withTxnId(equalTo(txnId));
+            stateMatcher.withOutcome(nullValue());
+            TransactionalState txState = new TransactionalState();
+            txState.setTxnId(txnId);
+            txState.setOutcome(new Accepted());
+
+            testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true);
+            testPeer.expectDischarge(txnId, true);
+            testPeer.expectClose();
+
+            final AtomicReference<JMSException> rollback = new AtomicReference<JMSException>(null);
+            TextMessage message = session.createTextMessage(text);
+            TestJmsCompletionListener listener = new TestJmsCompletionListener() {
+
+                @Override
+                public void onCompletion(Message message) {
+
+                    try {
+                        session.rollback();
+                    } catch (JMSException jmsEx) {
+                        rollback.set(jmsEx);
+                    }
+
+                    super.onCompletion(message);
+                };
+            };
+
+            producer.send(message, listener);
+
+            assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS));
+            assertNull(listener.exception);
+            assertNotNull(listener.message);
+            assertNotNull(rollback.get());
+            assertTrue(rollback.get() instanceof IllegalStateException);
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
     public void testAnonymousProducerSendFailureHandledWhenAnonymousRelayNodeIsNotSupported() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
 
@@ -1831,4 +2380,47 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
+
+    private class TestJmsCompletionListener implements JmsCompletionListener {
+
+        private final CountDownLatch completed;
+
+        public volatile int successCount;
+        public volatile int errorCount;
+
+        public volatile Message message;
+        public volatile Exception exception;
+
+        public TestJmsCompletionListener() {
+            this(1);
+        }
+
+        public TestJmsCompletionListener(int expected) {
+            this.completed = new CountDownLatch(expected);
+        }
+
+        public boolean awaitCompletion(long timeout, TimeUnit units) throws InterruptedException {
+            return completed.await(timeout, units);
+        }
+
+        @Override
+        public void onCompletion(Message message) {
+            LOG.info("JmsCompletionListener onCompletion called with message: {}", message);
+            this.message = message;
+            this.successCount++;
+
+            completed.countDown();
+        }
+
+        @Override
+        public void onException(Message message, Exception exception) {
+            LOG.info("JmsCompletionListener onException called with message: {} error {}", message, exception);
+
+            this.message = message;
+            this.exception = exception;
+            this.errorCount++;
+
+            completed.countDown();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 7bf35e4..34d60f1 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -57,8 +57,10 @@ import javax.jms.TextMessage;
 import javax.jms.Topic;
 import javax.jms.TopicSubscriber;
 
+import org.apache.qpid.jms.JmsCompletionListener;
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.JmsDefaultConnectionListener;
+import org.apache.qpid.jms.JmsMessageProducer;
 import org.apache.qpid.jms.JmsOperationTimedOutException;
 import org.apache.qpid.jms.JmsSession;
 import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
@@ -1446,7 +1448,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
     public void testCreateAnonymousProducerWhenAnonymousRelayNodeIsNotSupported() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
 
-            //DO NOT add capability to indicate server support for ANONYMOUS-RELAY
+            // DO NOT add capability to indicate server support for ANONYMOUS-RELAY
 
             Connection connection = testFixture.establishConnecton(testPeer);
             connection.start();
@@ -1460,12 +1462,12 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             // Expect no AMQP traffic when we create the anonymous producer, as it will wait
             // for an actual send to occur on the producer before anything occurs on the wire
 
-            //Create an anonymous producer
+            // Create an anonymous producer
             MessageProducer producer = session.createProducer(null);
             assertNotNull("Producer object was null", producer);
 
-            //Expect a new message sent by the above producer to cause creation of a new
-            //sender link to the given destination, then closing the link after the message is sent.
+            // Expect a new message sent by the above producer to cause creation of a new
+            // sender link to the given destination, then closing the link after the message is sent.
             TargetMatcher targetMatcher = new TargetMatcher();
             targetMatcher.withAddress(equalTo(topicName));
             targetMatcher.withDynamic(equalTo(false));
@@ -1484,7 +1486,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             Message message = session.createMessage();
             producer.send(dest, message);
 
-            //Repeat the send and observe another attach->transfer->detach.
+            // Repeat the send and observe another attach->transfer->detach.
             testPeer.expectSenderAttach(targetMatcher, false, false);
             testPeer.expectTransfer(messageMatcher);
             testPeer.expectDetach(true, true, true);
@@ -1675,6 +1677,78 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
     }
 
     @Test(timeout = 20000)
+    public void testRemotelyEndSessionWithProducerCompletesAsyncSends() throws Exception {
+        final String BREAD_CRUMB = "ErrorMessage";
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            final AtomicBoolean sessionClosed = new AtomicBoolean();
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onSessionClosed(Session session, Exception exception) {
+                    sessionClosed.set(true);
+                }
+            });
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            // Create a producer, then remotely end the session afterwards.
+            testPeer.expectSenderAttach();
+
+            Queue queue = session.createQueue("myQueue");
+            // TODO - Can revert to just MessageProducer once JMS 2.0 API is used
+            final JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue);
+
+            Message message = session.createTextMessage("content");
+
+            final int MSG_COUNT = 3;
+
+            for (int i = 0; i < MSG_COUNT; ++i) {
+                testPeer.expectTransferButDoNotRespond(new TransferPayloadCompositeMatcher());
+            }
+
+            testPeer.remotelyEndLastOpenedSession(true, 0, AmqpError.RESOURCE_DELETED, BREAD_CRUMB);
+
+            TestJmsCompletionListener listener = new TestJmsCompletionListener(MSG_COUNT);
+            try {
+                for (int i = 0; i < MSG_COUNT; ++i) {
+                    producer.send(message, listener);
+                }
+            } catch (JMSException e) {
+                LOG.warn("Caught unexpected error: {}", e.getMessage());
+                fail("No expected exception for this send.");
+            }
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            // Verify the producer gets marked closed
+            assertTrue(listener.awaitCompletion(2000, TimeUnit.SECONDS));
+            assertEquals(MSG_COUNT, listener.errorCount);
+            assertEquals(0, listener.successCount);
+
+            // Verify the session is now marked closed
+            try {
+                session.getAcknowledgeMode();
+                fail("Expected ISE to be thrown due to being closed");
+            } catch (IllegalStateException jmsise) {
+                String errorMessage = jmsise.getCause().getMessage();
+                assertTrue(errorMessage.contains(AmqpError.RESOURCE_DELETED.toString()));
+                assertTrue(errorMessage.contains(BREAD_CRUMB));
+            }
+
+            assertTrue("Session closed callback didn't trigger", sessionClosed.get());
+
+            // Try closing it explicitly, should effectively no-op in client.
+            // The test peer will throw during close if it sends anything.
+            producer.close();
+
+            testPeer.expectClose();
+            connection.close();
+        }
+    }
+
+    @Test(timeout = 20000)
     public void testRemotelyEndSessionWithConsumer() throws Exception {
         final String BREAD_CRUMB = "ErrorMessage";
 
@@ -1920,4 +1994,34 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             connection.close();
         }
     }
+
+    private class TestJmsCompletionListener implements JmsCompletionListener {
+
+        private final CountDownLatch completed;
+
+        public volatile int successCount;
+        public volatile int errorCount;
+
+        public TestJmsCompletionListener(int expected) {
+            completed = new CountDownLatch(expected);
+        }
+
+        public boolean awaitCompletion(long timeout, TimeUnit units) throws InterruptedException {
+            return completed.await(timeout, units);
+        }
+
+        @Override
+        public void onCompletion(Message message) {
+            LOG.info("JmsCompletionListener onCompletion called with message: {}", message);
+            successCount++;
+            completed.countDown();
+        }
+
+        @Override
+        public void onException(Message message, Exception exception) {
+            LOG.info("JmsCompletionListener onException called with message: {} error {}", message, exception);
+            errorCount++;
+            completed.countDown();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerTest.java
index 80a6e6a..7117e9f 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerTest.java
@@ -22,35 +22,63 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import javax.jms.Connection;
 import javax.jms.DeliveryMode;
+import javax.jms.Destination;
 import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 
+import org.apache.qpid.jms.JmsCompletionListener;
+import org.apache.qpid.jms.JmsConnectionFactory;
 import org.apache.qpid.jms.JmsConnectionTestSupport;
 import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.JmsMessageProducer;
 import org.apache.qpid.jms.JmsQueue;
 import org.apache.qpid.jms.JmsSession;
+import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
+import org.apache.qpid.jms.provider.mock.MockRemotePeer;
+import org.apache.qpid.jms.test.Wait;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Test basic functionality around JmsConnection
  */
 public class JmsMessageProducerTest extends JmsConnectionTestSupport {
 
+    private static final Logger LOG = LoggerFactory.getLogger(JmsMessageProducerTest.class);
+
+    private final MyCompletionListener completionListener = new MyCompletionListener();
     private JmsSession session;
+    private final MockRemotePeer remotePeer = new MockRemotePeer();
 
     @Override
     @Before
     public void setUp() throws Exception {
         super.setUp();
+        remotePeer.start();
         connection = createConnectionToMockProvider();
         session = (JmsSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
     }
 
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        remotePeer.shutdown();
+        super.tearDown();
+    }
+
     @Test(timeout = 10000)
     public void testMultipleCloseCallsNoErrors() throws Exception {
         MessageProducer producer = session.createProducer(null);
@@ -106,7 +134,7 @@ public class JmsMessageProducerTest extends JmsConnectionTestSupport {
 
     @Test(timeout = 10000)
     public void testAnonymousProducerThrowsUOEWhenExplictDestinationNotProvided() throws Exception {
-        MessageProducer producer = session.createProducer(null);
+        JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(null);
 
         Message message = Mockito.mock(Message.class);
         try {
@@ -117,17 +145,31 @@ public class JmsMessageProducerTest extends JmsConnectionTestSupport {
         }
 
         try {
+            producer.send(message, completionListener);
+            fail("Expected exception not thrown");
+        } catch (UnsupportedOperationException uoe) {
+            // expected
+        }
+
+        try {
             producer.send(message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
             fail("Expected exception not thrown");
         } catch (UnsupportedOperationException uoe) {
             // expected
         }
+
+        try {
+            producer.send(message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE, completionListener);
+            fail("Expected exception not thrown");
+        } catch (UnsupportedOperationException uoe) {
+            // expected
+        }
     }
 
     @Test(timeout = 10000)
     public void testExplicitProducerThrowsUOEWhenExplictDestinationIsProvided() throws Exception {
         JmsDestination dest = new JmsQueue("explicitDestination");
-        MessageProducer producer = session.createProducer(new JmsQueue());
+        JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(dest);
 
         Message message = Mockito.mock(Message.class);
         try {
@@ -138,16 +180,30 @@ public class JmsMessageProducerTest extends JmsConnectionTestSupport {
         }
 
         try {
+            producer.send(dest, message, completionListener);
+            fail("Expected exception not thrown");
+        } catch (UnsupportedOperationException uoe) {
+            // expected
+        }
+
+        try {
             producer.send(dest, message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
             fail("Expected exception not thrown");
         } catch (UnsupportedOperationException uoe) {
             // expected
         }
+
+        try {
+            producer.send(dest, message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE, completionListener);
+            fail("Expected exception not thrown");
+        } catch (UnsupportedOperationException uoe) {
+            // expected
+        }
     }
 
     @Test(timeout = 10000)
     public void testAnonymousDestinationProducerThrowsIDEWhenNullDestinationIsProvided() throws Exception {
-        MessageProducer producer = session.createProducer(null);
+        JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(null);
 
         Message message = Mockito.mock(Message.class);
         try {
@@ -158,10 +214,369 @@ public class JmsMessageProducerTest extends JmsConnectionTestSupport {
         }
 
         try {
+            producer.send(null, message, completionListener);
+            fail("Expected exception not thrown");
+        } catch (InvalidDestinationException ide) {
+            // expected
+        }
+
+        try {
             producer.send(null, message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
             fail("Expected exception not thrown");
         } catch (InvalidDestinationException ide) {
             // expected
         }
+
+        try {
+            producer.send(null, message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE, completionListener);
+            fail("Expected exception not thrown");
+        } catch (InvalidDestinationException ide) {
+            // expected
+        }
+    }
+
+    @Test(timeout = 10000)
+    public void testAnonymousProducerThrowsIAEWhenNullCompletionListenerProvided() throws Exception {
+        JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(null);
+        JmsDestination dest = new JmsQueue("explicitDestination");
+
+        Message message = Mockito.mock(Message.class);
+
+        try {
+            producer.send(dest, message, null);
+            fail("Expected exception not thrown");
+        } catch (IllegalArgumentException iae) {
+            // expected
+        }
+
+        try {
+            producer.send(dest, message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE, null);
+            fail("Expected exception not thrown");
+        } catch (IllegalArgumentException iae) {
+            // expected
+        }
+    }
+
+    @Test(timeout = 10000)
+    public void testExplicitProducerThrowsIAEWhenNullCompletionListenerIsProvided() throws Exception {
+        JmsDestination dest = new JmsQueue("explicitDestination");
+        JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(dest);
+
+        Message message = Mockito.mock(Message.class);
+        try {
+            producer.send(message, null);
+            fail("Expected exception not thrown");
+        } catch (IllegalArgumentException iae) {
+            // expected
+        }
+
+        try {
+            producer.send(message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE, null);
+            fail("Expected exception not thrown");
+        } catch (IllegalArgumentException iae) {
+            // expected
+        }
+    }
+
+    @Test(timeout = 10000)
+    public void testInOrderSendAcksCompletionsReturnInOrder() throws Exception {
+        final int MESSAGE_COUNT = 3;
+
+        final MockRemotePeer remotePoor = MockRemotePeer.INSTANCE;
+
+        JmsConnectionFactory factory = new JmsConnectionFactory(
+            "mock://localhost?mock.delayCompletionCalls=true");
+
+        Connection connection = factory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        final Destination destination = new JmsQueue("explicitDestination");
+        JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(destination);
+        final MyCompletionListener listener = new MyCompletionListener();
+
+        sendMessages(MESSAGE_COUNT, producer, listener);
+
+        assertTrue("Not all sends made it to the remote", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return remotePoor.getPendingCompletions(destination).size() == MESSAGE_COUNT;
+            }
+        }));
+
+        remotePoor.completeAllPendingSends(destination);
+
+        assertTrue("Not all completions triggered", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return listener.getCompletedSends().size() == MESSAGE_COUNT;
+            }
+        }));
+
+        assertMessageCompletedInOrder(MESSAGE_COUNT, listener);
+
+        connection.close();
+    }
+
+    @Test(timeout = 10000)
+    public void testReversedOrderSendAcksCompletionsReturnInOrder() throws Exception {
+        final int MESSAGE_COUNT = 3;
+
+        final MockRemotePeer remotePoor = MockRemotePeer.INSTANCE;
+
+        JmsConnectionFactory factory = new JmsConnectionFactory(
+            "mock://localhost?mock.delayCompletionCalls=true");
+
+        Connection connection = factory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        final Destination destination = new JmsQueue("explicitDestination");
+        JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(destination);
+        final MyCompletionListener listener = new MyCompletionListener();
+
+        sendMessages(MESSAGE_COUNT, producer, listener);
+
+        assertTrue("Not all sends made it to the remote", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return remotePoor.getPendingCompletions(destination).size() == MESSAGE_COUNT;
+            }
+        }));
+
+        List<JmsOutboundMessageDispatch> pending = remotePoor.getPendingCompletions(destination);
+        assertEquals(MESSAGE_COUNT, pending.size());
+        Collections.reverse(pending);
+
+        for (JmsOutboundMessageDispatch envelope : pending) {
+            LOG.info("Trigger completion of message: {}", envelope.getMessage().getJMSMessageID());
+            remotePoor.completePendingSend(envelope);
+        }
+
+        assertTrue("Not all completions triggered", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return listener.getCompletedSends().size() == MESSAGE_COUNT;
+            }
+        }));
+
+        assertMessageCompletedInOrder(MESSAGE_COUNT, listener);
+
+        connection.close();
+    }
+
+    @Test(timeout = 10000)
+    public void testInOrderSendFailuresCompletionsReturnInOrder() throws Exception {
+        final int MESSAGE_COUNT = 3;
+
+        final MockRemotePeer remotePoor = MockRemotePeer.INSTANCE;
+
+        JmsConnectionFactory factory = new JmsConnectionFactory(
+            "mock://localhost?mock.delayCompletionCalls=true");
+
+        Connection connection = factory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        final Destination destination = new JmsQueue("explicitDestination");
+        JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(destination);
+        final MyCompletionListener listener = new MyCompletionListener();
+
+        sendMessages(MESSAGE_COUNT, producer, listener);
+        assertTrue("Not all messages sent", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return remotePoor.getPendingCompletions(destination).size() == MESSAGE_COUNT;
+            }
+        }));
+        remotePoor.failAllPendingSends(destination, new JMSException("Could not send message"));
+
+        assertTrue("Not all completions triggered", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return listener.getFailedSends().size() == MESSAGE_COUNT;
+            }
+        }));
+
+        assertMessageFailedInOrder(MESSAGE_COUNT, listener);
+
+        connection.close();
+    }
+
+    @Test(timeout = 10000)
+    public void testReversedOrderSendAcksFailuresReturnInOrder() throws Exception {
+        final int MESSAGE_COUNT = 3;
+
+        final MockRemotePeer remotePoor = MockRemotePeer.INSTANCE;
+
+        JmsConnectionFactory factory = new JmsConnectionFactory(
+            "mock://localhost?mock.delayCompletionCalls=true");
+
+        Connection connection = factory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        final Destination destination = new JmsQueue("explicitDestination");
+        JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(destination);
+        final MyCompletionListener listener = new MyCompletionListener();
+
+        sendMessages(MESSAGE_COUNT, producer, listener);
+
+        assertTrue("Not all sends made it to the remote", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return remotePoor.getPendingCompletions(destination).size() == MESSAGE_COUNT;
+            }
+        }));
+
+        List<JmsOutboundMessageDispatch> pending = remotePoor.getPendingCompletions(destination);
+        assertEquals(MESSAGE_COUNT, pending.size());
+        Collections.reverse(pending);
+
+        for (JmsOutboundMessageDispatch envelope : pending) {
+            LOG.info("Trigger failure of message: {}", envelope.getMessage().getJMSMessageID());
+            remotePoor.failPendingSend(envelope, new JMSException("Failed to send message"));
+        }
+
+        assertTrue("Not all failures triggered", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return listener.getFailedSends().size() == MESSAGE_COUNT;
+            }
+        }));
+
+        assertMessageFailedInOrder(MESSAGE_COUNT, listener);
+
+        connection.close();
+    }
+
+    @Test(timeout = 10000)
+    public void testInterleavedCompletionsReturnedInOrder() throws Exception {
+        final int MESSAGE_COUNT = 3;
+
+        final MockRemotePeer remotePoor = MockRemotePeer.INSTANCE;
+
+        JmsConnectionFactory factory = new JmsConnectionFactory(
+            "mock://localhost?mock.delayCompletionCalls=true");
+
+        Connection connection = factory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        final Destination destination = new JmsQueue("explicitDestination");
+        JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(destination);
+        final MyCompletionListener listener = new MyCompletionListener();
+
+        sendMessages(MESSAGE_COUNT, producer, listener);
+
+        assertTrue("Not all sends made it to the remote", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return remotePoor.getPendingCompletions(destination).size() == MESSAGE_COUNT;
+            }
+        }));
+
+        List<JmsOutboundMessageDispatch> pending = remotePoor.getPendingCompletions(destination);
+        assertEquals(MESSAGE_COUNT, pending.size());
+        Collections.reverse(pending);
+
+        for (JmsOutboundMessageDispatch envelope : pending) {
+            int sequence = envelope.getMessage().getIntProperty("sequence");
+            if (sequence % 2 == 0) {
+                LOG.info("Trigger completion of message: {}", envelope.getMessage().getJMSMessageID());
+                remotePoor.completePendingSend(envelope);
+            } else {
+                LOG.info("Trigger failure of message: {}", envelope.getMessage().getJMSMessageID());
+                remotePoor.failPendingSend(envelope, new JMSException("Failed to send message"));
+            }
+        }
+
+        assertTrue("Not all completions triggered", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return listener.getCombinedSends().size() == MESSAGE_COUNT;
+            }
+        }));
+
+        assertTotalCompletionOrder(MESSAGE_COUNT, listener);
+
+        connection.close();
+    }
+
+    private void sendMessages(int count, JmsMessageProducer producer, MyCompletionListener listener) throws Exception {
+        for (int i = 0; i < count; ++i) {
+            Message message = session.createMessage();
+            message.setIntProperty("sequence", i);
+
+            producer.send(message, listener);
+        }
+    }
+
+    private void assertMessageCompletedInOrder(int expected, MyCompletionListener listener) throws Exception {
+        assertEquals("Did not get expected number of completions", expected, listener.completed.size());
+        for (int i = 0; i < listener.completed.size(); ++i) {
+            int sequence = listener.completed.get(i).getIntProperty("sequence");
+            assertEquals("Did not complete expected message: " + i + " got: " + sequence, i, sequence);
+        }
+    }
+
+    private void assertMessageFailedInOrder(int expected, MyCompletionListener listener) throws Exception {
+        assertEquals("Did not get expected number of failures", expected, listener.failed.size());
+        for (int i = 0; i < listener.failed.size(); ++i) {
+            int sequence = listener.failed.get(i).getIntProperty("sequence");
+            assertEquals("Did not fail expected message: " + i + " got: " + sequence, i, sequence);
+        }
+    }
+
+    private void assertTotalCompletionOrder(int expected, MyCompletionListener listener) throws Exception {
+        assertEquals("Did not get expected number of failures", expected, listener.combinedResult.size());
+        for (int i = 0; i < listener.combinedResult.size(); ++i) {
+            int sequence = listener.combinedResult.get(i).getIntProperty("sequence");
+            assertEquals("Did not fail expected message: " + i + " got: " + sequence, i, sequence);
+        }
+    }
+
+    private class MyCompletionListener implements JmsCompletionListener {
+
+        private final List<Message> completed = new ArrayList<Message>();
+        private final List<Message> failed = new ArrayList<Message>();
+        private final List<Message> combinedResult = new ArrayList<Message>();
+
+        @Override
+        public void onCompletion(Message message) {
+            try {
+                LOG.debug("Recording completed send: {}", message.getJMSMessageID());
+            } catch (JMSException e) {
+            }
+            completed.add(message);
+            combinedResult.add(message);
+        }
+
+        @Override
+        public void onException(Message message, Exception exception) {
+            try {
+                LOG.debug("Recording failed send: {} -> error {}", message.getJMSMessageID(), exception.getMessage());
+            } catch (JMSException e) {
+            }
+            failed.add(message);
+            combinedResult.add(message);
+        }
+
+        public List<Message> getCombinedSends() {
+            return combinedResult;
+        }
+
+        public List<Message> getCompletedSends() {
+            return completed;
+        }
+
+        public List<Message> getFailedSends() {
+            return failed;
+        }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org