You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2018/10/09 20:25:42 UTC

[1/2] qpid-jms git commit: QPIDJMS-416 Run all AmqpProvider work on netty event loop thread

Repository: qpid-jms
Updated Branches:
  refs/heads/master 4b8739b75 -> 4314482de


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4314482d/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
index f463971..469be6e 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
@@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeTrue;
@@ -29,6 +30,7 @@ import java.lang.reflect.Field;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.qpid.jms.test.QpidJmsTestCase;
@@ -36,8 +38,12 @@ import org.apache.qpid.jms.test.Wait;
 import org.apache.qpid.jms.transports.Transport;
 import org.apache.qpid.jms.transports.TransportListener;
 import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.util.QpidJMSTestRunner;
+import org.apache.qpid.jms.util.QpidJMSThreadFactory;
+import org.apache.qpid.jms.util.Repeat;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,6 +59,7 @@ import io.netty.util.ResourceLeakDetector.Level;
 /**
  * Test basic functionality of the Netty based TCP transport.
  */
+@RunWith(QpidJMSTestRunner.class)
 public class NettyTcpTransportTest extends QpidJmsTestCase {
 
     private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransportTest.class);
@@ -91,6 +98,42 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
         }
     }
 
+    @Test(timeout = 60000)
+    public void testConnectWithCustomThreadFactoryConfigured() throws Exception {
+        try (NettyEchoServer server = createEchoServer(createServerOptions())) {
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port);
+            QpidJMSThreadFactory factory = new QpidJMSThreadFactory("NettyTransportTest", true);
+
+            Transport transport = createTransport(serverLocation, testListener, createClientOptions());
+            transport.setThreadFactory(factory);
+
+            try {
+                transport.connect(null, null);
+            } catch (Exception e) {
+                LOG.info("Failed to connect to: {} as expected.", serverLocation);
+                fail("Should have failed to connect to the server: " + serverLocation);
+            }
+
+            assertTrue(transport.isConnected());
+            assertSame(factory, transport.getThreadFactory());
+
+            try {
+                transport.setThreadFactory(factory);
+            } catch (IllegalStateException expected) {
+                LOG.trace("Caught expected state exception");
+            }
+
+            transport.close();
+        }
+
+        assertTrue(!transportClosed);  // Normal shutdown does not trigger the event.
+        assertTrue(exceptions.isEmpty());
+        assertTrue(data.isEmpty());
+    }
+
     @Test(timeout = 60 * 1000)
     public void testConnectWithoutRunningServer() throws Exception {
         try (NettyEchoServer server = createEchoServer(createServerOptions())) {
@@ -103,7 +146,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
 
             Transport transport = createTransport(serverLocation, testListener, createClientOptions());
             try {
-                transport.connect(null);
+                transport.connect(null, null);
                 fail("Should have failed to connect to the server: " + serverLocation);
             } catch (Exception e) {
                 LOG.info("Failed to connect to: {} as expected.", serverLocation);
@@ -129,7 +172,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
 
             Transport transport = createTransport(serverLocation, null, createClientOptions());
             try {
-                transport.connect(null);
+                transport.connect(null, null);
                 fail("Should have failed to connect to the server: " + serverLocation);
             } catch (Exception e) {
                 LOG.info("Failed to connect to: {} as expected.", serverLocation);
@@ -155,7 +198,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
             assertNotNull(transport.getTransportListener());
 
             try {
-                transport.connect(null);
+                transport.connect(null, null);
                 LOG.info("Connected to server:{} as expected.", serverLocation);
             } catch (Exception e) {
                 fail("Should not have failed to connect to the server at " + serverLocation + " but got exception: " + e);
@@ -177,7 +220,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
 
             Transport transport = createTransport(serverLocation, testListener, createClientOptions());
             try {
-                transport.connect(null);
+                transport.connect(null, null);
                 LOG.info("Connected to server:{} as expected.", serverLocation);
             } catch (Exception e) {
                 fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
@@ -212,7 +255,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
             for (int i = 0; i < CONNECTION_COUNT; ++i) {
                 Transport transport = createTransport(serverLocation, testListener, createClientOptions());
                 try {
-                    transport.connect(null);
+                    transport.connect(null, null);
                     assertTrue(transport.isConnected());
                     LOG.info("Connected to server:{} as expected.", serverLocation);
                     transports.add(transport);
@@ -252,7 +295,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
             for (int i = 0; i < CONNECTION_COUNT; ++i) {
                 Transport transport = createTransport(serverLocation, testListener, createClientOptions());
                 try {
-                    transport.connect(null);
+                    transport.connect(null, null);
                     transport.writeAndFlush(sendBuffer.copy());
                     transports.add(transport);
                 } catch (Exception e) {
@@ -288,7 +331,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
 
             transport = createTransport(serverLocation, testListener, createClientOptions());
             try {
-                transport.connect(null);
+                transport.connect(null, null);
                 LOG.info("Connected to server:{} as expected.", serverLocation);
             } catch (Exception e) {
                 fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
@@ -326,7 +369,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
 
             Transport transport = createTransport(serverLocation, testListener, createClientOptions());
             try {
-                transport.connect(null);
+                transport.connect(null, null);
                 LOG.info("Connected to server:{} as expected.", serverLocation);
             } catch (Exception e) {
                 fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
@@ -354,7 +397,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
 
             Transport transport = createTransport(serverLocation, testListener, createClientOptions());
             try {
-                transport.connect(null);
+                transport.connect(null, null);
                 LOG.info("Connected to server:{} as expected.", serverLocation);
             } catch (Exception e) {
                 fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
@@ -405,7 +448,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
 
             Transport transport = createTransport(serverLocation, testListener, createClientOptions());
             try {
-                transport.connect(null);
+                transport.connect(null, null);
                 LOG.info("Connected to server:{} as expected.", serverLocation);
             } catch (Exception e) {
                 fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
@@ -448,7 +491,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
 
             transport = createTransport(serverLocation, testListener, createClientOptions());
             try {
-                transport.connect(null);
+                transport.connect(null, null);
                 LOG.info("Connected to server:{} as expected.", serverLocation);
             } catch (Exception e) {
                 fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
@@ -467,6 +510,63 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
         }
     }
 
+    @Test(timeout = 60000)
+    public void testConnectRunsInitializationMethod() throws Exception {
+        try (NettyEchoServer server = createEchoServer(createServerOptions())) {
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port);
+            final AtomicBoolean initialized = new AtomicBoolean();
+
+            Transport transport = createTransport(serverLocation, testListener, createClientOptions());
+            try {
+                transport.connect(() -> initialized.set(true), null);
+                LOG.info("Connected to server:{} as expected.", serverLocation);
+            } catch (Exception e) {
+                fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
+            }
+
+            assertTrue(transport.isConnected());
+            assertEquals(serverLocation, transport.getRemoteLocation());
+            assertTrue(initialized.get());
+
+            transport.close();
+        }
+
+        assertTrue(!transportClosed);  // Normal shutdown does not trigger the event.
+        assertTrue(exceptions.isEmpty());
+        assertTrue(data.isEmpty());
+    }
+
+    @Test(timeout = 60000)
+    @Repeat(repetitions = 1)
+    public void testFailureInInitializationRoutineFailsConnect() throws Exception {
+        try (NettyEchoServer server = createEchoServer(createServerOptions())) {
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port);
+
+            Transport transport = createTransport(serverLocation, testListener, createClientOptions());
+            try {
+                transport.connect(() -> { throw new RuntimeException(); }, null);
+                fail("Should not have connected to the server at " + serverLocation);
+            } catch (Exception e) {
+                LOG.info("Failed to connect to server:{} as expected", serverLocation);
+            }
+
+            assertFalse("Should not be connected", transport.isConnected());
+            assertEquals("Server location is incorrect", serverLocation, transport.getRemoteLocation());
+
+            transport.close();
+        }
+
+        assertTrue(transportClosed);  // Normal shutdown does not trigger the event.
+        assertTrue(exceptions.isEmpty());
+        assertTrue(data.isEmpty());
+    }
+
     @Ignore("Used for checking for transport level leaks, my be unstable on CI.")
     @Test(timeout = 60 * 1000)
     public void testSendToClosedTransportFailsButDoesNotLeak() throws Exception {
@@ -483,7 +583,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
             for (int i = 0; i < 256; ++i) {
                 transport = createTransport(serverLocation, testListener, createClientOptions());
                 try {
-                    transport.connect(null);
+                    transport.connect(null, null);
                     LOG.info("Connected to server:{} as expected.", serverLocation);
                 } catch (Exception e) {
                     fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
@@ -531,7 +631,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
             options.setUseKQueue(false);
             Transport transport = createTransport(serverLocation, testListener, options);
             try {
-                transport.connect(null);
+                transport.connect(null, null);
                 LOG.info("Connected to server:{} as expected.", serverLocation);
             } catch (Exception e) {
                 fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
@@ -601,7 +701,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
             options.setUseEpoll(false);
             Transport transport = createTransport(serverLocation, testListener, options);
             try {
-                transport.connect(null);
+                transport.connect(null, null);
                 LOG.info("Connected to server:{} as expected.", serverLocation);
             } catch (Exception e) {
                 fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4314482d/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java
index ead740c..2ea873d 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java
@@ -75,7 +75,7 @@ public class NettyWsTransportTest extends NettyTcpTransportTest {
 
             Transport transport = createTransport(serverLocation, testListener, createClientOptions());
             try {
-                transport.connect(null);
+                transport.connect(null, null);
                 LOG.info("Connected to server:{} as expected.", serverLocation);
             } catch (Exception e) {
                 fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
@@ -110,7 +110,7 @@ public class NettyWsTransportTest extends NettyTcpTransportTest {
 
             Transport transport = createTransport(serverLocation, testListener, createClientOptions());
             try {
-                transport.connect(null);
+                transport.connect(null, null);
                 fail("Should have failed to connect to the server: " + serverLocation);
             } catch (Exception e) {
                 LOG.info("Failed to connect to: {} as expected.", serverLocation);
@@ -149,7 +149,7 @@ public class NettyWsTransportTest extends NettyTcpTransportTest {
             try {
                 // The transport should allow for the size of data we sent.
                 transport.setMaxFrameSize(FRAME_SIZE);
-                transport.connect(null);
+                transport.connect(null, null);
                 transports.add(transport);
                 transport.writeAndFlush(sendBuffer.copy());
             } catch (Exception e) {
@@ -200,7 +200,7 @@ public class NettyWsTransportTest extends NettyTcpTransportTest {
             Transport transport = createTransport(serverLocation, wsListener, createClientOptions);
             try {
                 transport.setMaxFrameSize(FRAME_SIZE);
-                transport.connect(null);
+                transport.connect(null, null);
                 transports.add(transport);
                 transport.writeAndFlush(sendBuffer.copy());
             } catch (Exception e) {
@@ -261,7 +261,7 @@ public class NettyWsTransportTest extends NettyTcpTransportTest {
                 // Transport can't receive anything bigger so it should fail the connection
                 // when data arrives that is larger than this value.
                 transport.setMaxFrameSize(FRAME_SIZE / 2);
-                transport.connect(null);
+                transport.connect(null, null);
                 transports.add(transport);
                 transport.writeAndFlush(sendBuffer.copy());
             } catch (Exception e) {
@@ -297,7 +297,7 @@ public class NettyWsTransportTest extends NettyTcpTransportTest {
             try {
                 // Transport allows bigger frames in so that server is the one causing the failure.
                 transport.setMaxFrameSize(FRAME_SIZE);
-                transport.connect(null);
+                transport.connect(null, null);
                 transports.add(transport);
                 transport.writeAndFlush(sendBuffer.copy());
             } catch (Exception e) {
@@ -358,7 +358,7 @@ public class NettyWsTransportTest extends NettyTcpTransportTest {
 
             Transport transport = createTransport(serverLocation, testListener, clientOptions);
             try {
-                transport.connect(null);
+                transport.connect(null, null);
                 LOG.info("Connected to server:{} as expected.", serverLocation);
             } catch (Exception e) {
                 fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-jms git commit: QPIDJMS-416 Run all AmqpProvider work on netty event loop thread

Posted by ta...@apache.org.
QPIDJMS-416 Run all AmqpProvider work on netty event loop thread

Move from a separate provider executor to using the netty event loop for
AMQP protocol handling.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/4314482d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/4314482d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/4314482d

Branch: refs/heads/master
Commit: 4314482de1e8cb7e58d8331ca0e459f32d78ab4e
Parents: 4b8739b
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Oct 9 16:25:23 2018 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Oct 9 16:25:23 2018 -0400

----------------------------------------------------------------------
 .../qpid/jms/provider/amqp/AmqpProvider.java    | 969 +++++++++----------
 .../apache/qpid/jms/transports/Transport.java   |  26 +-
 .../jms/transports/netty/NettyTcpTransport.java |  85 +-
 .../jms/provider/amqp/AmqpProviderTest.java     | 104 +-
 .../netty/NettyOpenSslTransportTest.java        |   4 +-
 .../transports/netty/NettySslTransportTest.java |  12 +-
 .../transports/netty/NettyTcpTransportTest.java | 128 ++-
 .../transports/netty/NettyWsTransportTest.java  |  14 +-
 8 files changed, 788 insertions(+), 554 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4314482d/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index cc3915d..dbdc977 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -19,6 +19,7 @@ package org.apache.qpid.jms.provider.amqp;
 import java.io.IOException;
 import java.net.URI;
 import java.nio.ByteBuffer;
+import java.security.ProviderException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -26,9 +27,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -72,7 +74,6 @@ import org.apache.qpid.jms.transports.TransportListener;
 import org.apache.qpid.jms.util.IOExceptionSupport;
 import org.apache.qpid.jms.util.PropertyUtil;
 import org.apache.qpid.jms.util.QpidJMSThreadFactory;
-import org.apache.qpid.jms.util.ThreadPoolUtils;
 import org.apache.qpid.proton.engine.Collector;
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.Delivery;
@@ -140,7 +141,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
     private final URI remoteURI;
     private final AtomicBoolean closed = new AtomicBoolean();
     private volatile Throwable failureCause;
-    private ScheduledThreadPoolExecutor serializer;
+    private ScheduledExecutorService serializer;
     private final org.apache.qpid.proton.engine.Transport protonTransport =
         org.apache.qpid.proton.engine.Transport.Factory.create();
     private final Collector protonCollector = new CollectorImpl();
@@ -164,127 +165,130 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
         this.remoteURI = remoteURI;
         this.transport = transport;
         this.futureFactory = futureFactory;
-
-        serializer = new ScheduledThreadPoolExecutor(1, new QpidJMSThreadFactory(
-            "AmqpProvider :(" + PROVIDER_SEQUENCE.incrementAndGet() + "):[" +
-            remoteURI.getScheme() + "://" + remoteURI.getHost() + ":" + remoteURI.getPort() + "]", true));
-
-        serializer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
-        serializer.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
     }
 
     @Override
     public void connect(final JmsConnectionInfo connectionInfo) throws IOException {
         checkClosedOrFailed();
 
+        if (serializer != null) {
+            throw new IllegalStateException("Connect cannot be called more than once");
+        }
+
         final ProviderFuture connectRequest = futureFactory.createFuture();
 
-        serializer.execute(new Runnable() {
+        // Configure Transport prior to initialization at which point configuration is set and
+        // cannot be updated.  All further interaction should take place on the serializer for
+        // thread safety.
 
-            @Override
-            public void run() {
+        ThreadFactory transportThreadFactory = new QpidJMSThreadFactory(
+                "AmqpProvider :(" + PROVIDER_SEQUENCE.incrementAndGet() + "):[" +
+                remoteURI.getScheme() + "://" + remoteURI.getHost() + ":" + remoteURI.getPort() + "]", true);
 
-                connectionRequest = connectRequest;
-                AmqpProvider.this.connectionInfo = connectionInfo;
+        transport.setThreadFactory(transportThreadFactory);
+        transport.setTransportListener(AmqpProvider.this);
+        transport.setMaxFrameSize(maxFrameSize);
 
-                try {
-                    protonTransport.setEmitFlowEventOnSend(false);
-
-                    try {
-                        ((TransportInternal) protonTransport).setUseReadOnlyOutputBuffer(false);
-                    } catch (NoSuchMethodError nsme) {
-                        // using a version at runtime where the optimisation isn't available, ignore
-                        LOG.trace("Proton output buffer optimisation unavailable");
-                    }
+        final SSLContext sslContextOverride;
+        if (connectionInfo.getExtensionMap().containsKey(JmsConnectionExtensions.SSL_CONTEXT)) {
+            sslContextOverride =
+                (SSLContext) connectionInfo.getExtensionMap().get(
+                    JmsConnectionExtensions.SSL_CONTEXT).apply(connectionInfo.getConnection(), transport.getRemoteLocation());
+        } else {
+            sslContextOverride = null;
+        }
 
-                    if (getMaxFrameSize() > 0) {
-                        protonTransport.setMaxFrameSize(getMaxFrameSize());
-                        protonTransport.setOutboundFrameSizeLimit(getMaxFrameSize());
-                    }
+        if (connectionInfo.getExtensionMap().containsKey(JmsConnectionExtensions.HTTP_HEADERS_OVERRIDE)) {
+            @SuppressWarnings({ "unchecked" })
+            Map<String, String> headers = (Map<String, String>)
+                connectionInfo.getExtensionMap().get(
+                    JmsConnectionExtensions.HTTP_HEADERS_OVERRIDE).apply(connectionInfo.getConnection(), transport.getRemoteLocation());
+            if (headers != null) {
+                transport.getTransportOptions().getHttpHeaders().putAll(headers);
+            }
+        }
 
-                    protonTransport.setChannelMax(getChannelMax());
-                    protonTransport.setIdleTimeout(idleTimeout);
-                    protonTransport.bind(protonConnection);
-                    protonConnection.collect(protonCollector);
-
-                    final SSLContext sslContextOverride;
-                    if (connectionInfo.getExtensionMap().containsKey(JmsConnectionExtensions.SSL_CONTEXT)) {
-                        sslContextOverride =
-                            (SSLContext) connectionInfo.getExtensionMap().get(
-                                JmsConnectionExtensions.SSL_CONTEXT).apply(connectionInfo.getConnection(), transport.getRemoteLocation());
-                    } else {
-                        sslContextOverride = null;
-                    }
+        try {
+            serializer = transport.connect(() -> {
+                this.connectionInfo = connectionInfo;
+                this.connectionRequest = connectRequest;
 
-                    transport.setTransportListener(AmqpProvider.this);
-                    transport.setMaxFrameSize(maxFrameSize);
+                protonTransport.setEmitFlowEventOnSend(false);
 
-                    if (connectionInfo.getExtensionMap().containsKey(JmsConnectionExtensions.HTTP_HEADERS_OVERRIDE)) {
-                        @SuppressWarnings({ "unchecked" })
-                        Map<String, String> headers = (Map<String, String>)
-                            connectionInfo.getExtensionMap().get(
-                                JmsConnectionExtensions.HTTP_HEADERS_OVERRIDE).apply(connectionInfo.getConnection(), transport.getRemoteLocation());
-                        if (headers != null) {
-                            transport.getTransportOptions().getHttpHeaders().putAll(headers);
-                        }
-                    }
+                try {
+                    ((TransportInternal) protonTransport).setUseReadOnlyOutputBuffer(false);
+                } catch (NoSuchMethodError nsme) {
+                    // using a version at runtime where the optimisation isn't available, ignore
+                    LOG.trace("Proton output buffer optimisation unavailable");
+                }
 
-                    transport.connect(sslContextOverride);
+                if (getMaxFrameSize() > 0) {
+                    protonTransport.setMaxFrameSize(getMaxFrameSize());
+                    protonTransport.setOutboundFrameSizeLimit(getMaxFrameSize());
+                }
 
-                    if (saslLayer) {
-                        Sasl sasl = protonTransport.sasl();
-                        sasl.client();
+                protonTransport.setChannelMax(getChannelMax());
+                protonTransport.setIdleTimeout(idleTimeout);
+                protonTransport.bind(protonConnection);
+                protonConnection.collect(protonCollector);
 
-                        String hostname = getVhost();
-                        if (hostname == null) {
-                            hostname = remoteURI.getHost();
-                        } else if (hostname.isEmpty()) {
-                            hostname = null;
-                        }
+                if (saslLayer) {
+                    Sasl sasl = protonTransport.sasl();
+                    sasl.client();
 
-                        sasl.setRemoteHostname(hostname);
-                        sasl.setListener(new SaslListener() {
+                    String hostname = getVhost();
+                    if (hostname == null) {
+                        hostname = remoteURI.getHost();
+                    } else if (hostname.isEmpty()) {
+                        hostname = null;
+                    }
 
-                            @Override
-                            public void onSaslMechanisms(Sasl sasl, org.apache.qpid.proton.engine.Transport transport) {
-                                authenticator.handleSaslMechanisms(sasl, transport);
-                                checkSaslAuthenticationState();
-                            }
+                    sasl.setRemoteHostname(hostname);
+                    sasl.setListener(new SaslListener() {
 
-                            @Override
-                            public void onSaslChallenge(Sasl sasl, org.apache.qpid.proton.engine.Transport transport) {
-                                authenticator.handleSaslChallenge(sasl, transport);
-                                checkSaslAuthenticationState();
-                            }
+                        @Override
+                        public void onSaslMechanisms(Sasl sasl, org.apache.qpid.proton.engine.Transport transport) {
+                            authenticator.handleSaslMechanisms(sasl, transport);
+                            checkSaslAuthenticationState();
+                        }
 
-                            @Override
-                            public void onSaslOutcome(Sasl sasl, org.apache.qpid.proton.engine.Transport transport) {
-                                authenticator.handleSaslOutcome(sasl, transport);
-                                checkSaslAuthenticationState();
-                            }
+                        @Override
+                        public void onSaslChallenge(Sasl sasl, org.apache.qpid.proton.engine.Transport transport) {
+                            authenticator.handleSaslChallenge(sasl, transport);
+                            checkSaslAuthenticationState();
+                        }
 
-                            @Override
-                            public void onSaslInit(Sasl sasl, org.apache.qpid.proton.engine.Transport transport) {
-                                // Server only event
-                            }
+                        @Override
+                        public void onSaslOutcome(Sasl sasl, org.apache.qpid.proton.engine.Transport transport) {
+                            authenticator.handleSaslOutcome(sasl, transport);
+                            checkSaslAuthenticationState();
+                        }
 
-                            @Override
-                            public void onSaslResponse(Sasl sasl, org.apache.qpid.proton.engine.Transport transport) {
-                                // Server only event
-                            }
-                        });
+                        @Override
+                        public void onSaslInit(Sasl sasl, org.apache.qpid.proton.engine.Transport transport) {
+                            // Server only event
+                        }
 
-                        authenticator = new AmqpSaslAuthenticator((remoteMechanisms) -> findSaslMechanism(remoteMechanisms));
+                        @Override
+                        public void onSaslResponse(Sasl sasl, org.apache.qpid.proton.engine.Transport transport) {
+                            // Server only event
+                        }
+                    });
 
-                        pumpToProtonTransport();
-                    } else {
-                        connectRequest.onSuccess();
-                    }
-                } catch (Throwable t) {
-                    connectionRequest.onFailure(IOExceptionSupport.create(t));
+                    authenticator = new AmqpSaslAuthenticator((remoteMechanisms) -> findSaslMechanism(remoteMechanisms));
                 }
+            }, sslContextOverride);
+
+            // Once connected pump the transport to write the header and respond to any
+            // data that arrived at connect such as pipelined Header etc
+            serializer.execute(() -> pumpToProtonTransport());
+
+            if (!saslLayer) {
+                connectRequest.onSuccess();
             }
-        });
+        } catch (Throwable t) {
+            connectRequest.onFailure(IOExceptionSupport.create(t));
+        }
 
         if (connectionInfo.getConnectTimeout() != JmsConnectionInfo.INFINITE) {
             if (!connectRequest.sync(connectionInfo.getConnectTimeout(), TimeUnit.MILLISECONDS)) {
@@ -309,52 +313,61 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
         if (closed.compareAndSet(false, true)) {
             final ProviderFuture request = futureFactory.createUnfailableFuture();
 
-            serializer.execute(new Runnable() {
-
-                @Override
-                public void run() {
-                    try {
-                        // If we are not connected then there is nothing we can do now
-                        // just signal success.
-                        if (transport == null || !transport.isConnected()) {
-                            request.onSuccess();
-                            return;
-                        }
-
-                        if (connection != null) {
-                            connection.close(request);
-                        } else {
-                            // If the SASL authentication occurred but failed then we don't
-                            // need to do an open / close
-                            if (authenticator != null && (!authenticator.isComplete() || !authenticator.wasSuccessful())) {
+            // Possible that the connect call failed before calling transport connect or the connect
+            // call failed and shutdown the event loop in which case we have no work to do other than
+            // to clean up the transport by closing it down.
+            if (serializer != null && !serializer.isShutdown()) {
+                try {
+                    serializer.execute(() -> {
+                        try {
+                            // If we are not connected then there is nothing we can do now
+                            // just signal success.
+                            if (transport == null || !transport.isConnected()) {
                                 request.onSuccess();
                                 return;
                             }
 
-                            // Connection attempt might have been tried and failed so only perform
-                            // an open / close cycle if one hasn't been done already.
-                            if (protonConnection.getLocalState() == EndpointState.UNINITIALIZED) {
-                                AmqpClosedConnectionBuilder builder = new AmqpClosedConnectionBuilder(getProvider(), connectionInfo);
-                                builder.buildResource(request);
-
-                                protonConnection.setContext(builder);
+                            if (connection != null) {
+                                connection.close(request);
                             } else {
-                                request.onSuccess();
+                                // If the SASL authentication occurred but failed then we don't
+                                // need to do an open / close
+                                if (authenticator != null && (!authenticator.isComplete() || !authenticator.wasSuccessful())) {
+                                    request.onSuccess();
+                                    return;
+                                }
+
+                                // Connection attempt might have been tried and failed so only perform
+                                // an open / close cycle if one hasn't been done already.
+                                if (protonConnection.getLocalState() == EndpointState.UNINITIALIZED) {
+                                    AmqpClosedConnectionBuilder builder = new AmqpClosedConnectionBuilder(getProvider(), connectionInfo);
+                                    builder.buildResource(request);
+                                    protonConnection.setContext(builder);
+                                } else {
+                                    request.onSuccess();
+                                }
                             }
-                        }
 
-                        pumpToProtonTransport(request);
-                    } catch (Exception e) {
-                        LOG.debug("Caught exception while closing proton connection: {}", e.getMessage());
-                    } finally {
-                        if (nextIdleTimeoutCheck != null) {
-                            LOG.trace("Cancelling scheduled IdleTimeoutCheck");
-                            nextIdleTimeoutCheck.cancel(false);
-                            nextIdleTimeoutCheck = null;
+                            pumpToProtonTransport(request);
+                        } catch (Exception e) {
+                            LOG.debug("Caught exception while closing proton connection: {}", e.getMessage());
+                        } finally {
+                            if (nextIdleTimeoutCheck != null) {
+                                LOG.trace("Cancelling scheduled IdleTimeoutCheck");
+                                nextIdleTimeoutCheck.cancel(false);
+                                nextIdleTimeoutCheck = null;
+                            }
                         }
-                    }
+                    });
+                } catch (RejectedExecutionException rje) {
+                    // Transport likely encountered some critical error on connect and the executor
+                    // resource is not initialized now, in which case just ignore and continue on.
+                    LOG.trace("Close of provider resources was rejected from Transport IO thread: ", rje);
+                    request.onSuccess();
                 }
-            });
+            } else {
+                request.onSuccess();
+            }
 
             try {
                 if (getCloseTimeout() < 0) {
@@ -365,16 +378,12 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
             } catch (IOException e) {
                 LOG.warn("Error caught while closing Provider: {}", e.getMessage() != null ? e.getMessage() : "<Unknown Error>");
             } finally {
-                try {
-                    if (transport != null) {
-                        try {
-                            transport.close();
-                        } catch (Exception e) {
-                            LOG.debug("Caught exception while closing down Transport: {}", e.getMessage());
-                        }
+                if (transport != null) {
+                    try {
+                        transport.close();
+                    } catch (Exception e) {
+                        LOG.debug("Caught exception while closing down Transport: {}", e.getMessage());
                     }
-                } finally {
-                    ThreadPoolUtils.shutdownGraceful(serializer);
                 }
             }
         }
@@ -383,83 +392,82 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
     @Override
     public void create(final JmsResource resource, final AsyncResult request) throws IOException, JMSException {
         checkClosedOrFailed();
-        serializer.execute(new Runnable() {
+        checkConnected();
 
-            @Override
-            public void run() {
-                try {
-                    checkClosedOrFailed();
-                    resource.visit(new JmsResourceVistor() {
-                        @Override
-                        public void processSessionInfo(JmsSessionInfo sessionInfo) throws Exception {
-                            connection.createSession(sessionInfo, request);
-                        }
+        serializer.execute(() -> {
 
-                        @Override
-                        public void processProducerInfo(JmsProducerInfo producerInfo) throws Exception {
-                            AmqpSession session = connection.getSession(producerInfo.getParentId());
-                            session.createProducer(producerInfo, request);
-                        }
+            try {
+                checkClosedOrFailed();
+                resource.visit(new JmsResourceVistor() {
+                    @Override
+                    public void processSessionInfo(JmsSessionInfo sessionInfo) throws Exception {
+                        connection.createSession(sessionInfo, request);
+                    }
 
-                        @Override
-                        public void processConsumerInfo(JmsConsumerInfo consumerInfo) throws Exception {
-                            final AmqpSession session;
+                    @Override
+                    public void processProducerInfo(JmsProducerInfo producerInfo) throws Exception {
+                        AmqpSession session = connection.getSession(producerInfo.getParentId());
+                        session.createProducer(producerInfo, request);
+                    }
 
-                            if (consumerInfo.isConnectionConsumer()) {
-                                session = connection.getConnectionSession();
-                            } else {
-                                session = connection.getSession(consumerInfo.getParentId());
-                            }
+                    @Override
+                    public void processConsumerInfo(JmsConsumerInfo consumerInfo) throws Exception {
+                        final AmqpSession session;
 
-                            session.createConsumer(consumerInfo, request);
+                        if (consumerInfo.isConnectionConsumer()) {
+                            session = connection.getConnectionSession();
+                        } else {
+                            session = connection.getSession(consumerInfo.getParentId());
                         }
 
-                        @Override
-                        public void processConnectionInfo(JmsConnectionInfo connectionInfo) throws Exception {
-                            AmqpProvider.this.connectionInfo = connectionInfo;
-
-                            AmqpConnectionBuilder builder = new AmqpConnectionBuilder(AmqpProvider.this, connectionInfo);
-                            connectionRequest = new AsyncResult() {
-                                @Override
-                                public void onSuccess() {
-                                    fireConnectionEstablished();
-                                    request.onSuccess();
-                                }
+                        session.createConsumer(consumerInfo, request);
+                    }
 
-                                @Override
-                                public void onFailure(Throwable result) {
-                                    request.onFailure(result);
-                                }
+                    @Override
+                    public void processConnectionInfo(JmsConnectionInfo connectionInfo) throws Exception {
+                        AmqpProvider.this.connectionInfo = connectionInfo;
 
-                                @Override
-                                public boolean isComplete() {
-                                    return request.isComplete();
-                                }
-                            };
+                        AmqpConnectionBuilder builder = new AmqpConnectionBuilder(AmqpProvider.this, connectionInfo);
+                        connectionRequest = new AsyncResult() {
+                            @Override
+                            public void onSuccess() {
+                                fireConnectionEstablished();
+                                request.onSuccess();
+                            }
 
-                            builder.buildResource(connectionRequest);
-                        }
+                            @Override
+                            public void onFailure(Throwable result) {
+                                request.onFailure(result);
+                            }
 
-                        @Override
-                        public void processDestination(JmsTemporaryDestination destination) throws Exception {
-                            if (destination.isTemporary()) {
-                                connection.createTemporaryDestination(destination, request);
-                            } else {
-                                request.onSuccess();
+                            @Override
+                            public boolean isComplete() {
+                                return request.isComplete();
                             }
-                        }
+                        };
 
-                        @Override
-                        public void processTransactionInfo(JmsTransactionInfo transactionInfo) throws Exception {
-                            AmqpSession session = connection.getSession(transactionInfo.getSessionId());
-                            session.begin(transactionInfo.getId(), request);
+                        builder.buildResource(connectionRequest);
+                    }
+
+                    @Override
+                    public void processDestination(JmsTemporaryDestination destination) throws Exception {
+                        if (destination.isTemporary()) {
+                            connection.createTemporaryDestination(destination, request);
+                        } else {
+                            request.onSuccess();
                         }
-                    });
+                    }
 
-                    pumpToProtonTransport(request);
-                } catch (Throwable t) {
-                    request.onFailure(t);
-                }
+                    @Override
+                    public void processTransactionInfo(JmsTransactionInfo transactionInfo) throws Exception {
+                        AmqpSession session = connection.getSession(transactionInfo.getSessionId());
+                        session.begin(transactionInfo.getId(), request);
+                    }
+                });
+
+                pumpToProtonTransport(request);
+            } catch (Throwable t) {
+                request.onFailure(t);
             }
         });
     }
@@ -467,26 +475,25 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
     @Override
     public void start(final JmsResource resource, final AsyncResult request) throws IOException {
         checkClosedOrFailed();
-        serializer.execute(new Runnable() {
+        checkConnected();
 
-            @Override
-            public void run() {
-                try {
-                    checkClosedOrFailed();
-                    resource.visit(new JmsDefaultResourceVisitor() {
+        serializer.execute(() -> {
 
-                        @Override
-                        public void processConsumerInfo(JmsConsumerInfo consumerInfo) throws Exception {
-                            AmqpSession session = connection.getSession(consumerInfo.getParentId());
-                            AmqpConsumer consumer = session.getConsumer(consumerInfo);
-                            consumer.start(request);
-                        }
-                    });
+            try {
+                checkClosedOrFailed();
+                resource.visit(new JmsDefaultResourceVisitor() {
+
+                    @Override
+                    public void processConsumerInfo(JmsConsumerInfo consumerInfo) throws Exception {
+                        AmqpSession session = connection.getSession(consumerInfo.getParentId());
+                        AmqpConsumer consumer = session.getConsumer(consumerInfo);
+                        consumer.start(request);
+                    }
+                });
 
-                    pumpToProtonTransport(request);
-                } catch (Throwable t) {
-                    request.onFailure(t);
-                }
+                pumpToProtonTransport(request);
+            } catch (Throwable t) {
+                request.onFailure(t);
             }
         });
     }
@@ -494,26 +501,25 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
     @Override
     public void stop(final JmsResource resource, final AsyncResult request) throws IOException {
         checkClosedOrFailed();
-        serializer.execute(new Runnable() {
+        checkConnected();
 
-            @Override
-            public void run() {
-                try {
-                    checkClosedOrFailed();
-                    resource.visit(new JmsDefaultResourceVisitor() {
+        serializer.execute(() -> {
 
-                        @Override
-                        public void processConsumerInfo(JmsConsumerInfo consumerInfo) throws Exception {
-                            AmqpSession session = connection.getSession(consumerInfo.getParentId());
-                            AmqpConsumer consumer = session.getConsumer(consumerInfo);
-                            consumer.stop(request);
-                        }
-                    });
+            try {
+                checkClosedOrFailed();
+                resource.visit(new JmsDefaultResourceVisitor() {
+
+                    @Override
+                    public void processConsumerInfo(JmsConsumerInfo consumerInfo) throws Exception {
+                        AmqpSession session = connection.getSession(consumerInfo.getParentId());
+                        AmqpConsumer consumer = session.getConsumer(consumerInfo);
+                        consumer.stop(request);
+                    }
+                });
 
-                    pumpToProtonTransport(request);
-                } catch (Throwable t) {
-                    request.onFailure(t);
-                }
+                pumpToProtonTransport(request);
+            } catch (Throwable t) {
+                request.onFailure(t);
             }
         });
     }
@@ -521,101 +527,100 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
     @Override
     public void destroy(final JmsResource resource, final AsyncResult request) throws IOException {
         checkClosedOrFailed();
-        serializer.execute(new Runnable() {
+        checkConnected();
 
-            @Override
-            public void run() {
-                try {
-                    checkClosedOrFailed();
-                    resource.visit(new JmsDefaultResourceVisitor() {
+        serializer.execute(() -> {
 
-                        @Override
-                        public void processSessionInfo(JmsSessionInfo sessionInfo) throws Exception {
-                            final AmqpSession session = connection.getSession(sessionInfo.getId());
-                            session.close(new AsyncResult() {
-                                // TODO: bit of a hack, but works. Similarly below for locally initiated consumer close.
-                                @Override
-                                public void onSuccess() {
-                                    onComplete();
-                                    request.onSuccess();
-                                }
+            try {
+                checkClosedOrFailed();
+                resource.visit(new JmsDefaultResourceVisitor() {
+
+                    @Override
+                    public void processSessionInfo(JmsSessionInfo sessionInfo) throws Exception {
+                        final AmqpSession session = connection.getSession(sessionInfo.getId());
+                        session.close(new AsyncResult() {
+                            // TODO: bit of a hack, but works. Similarly below for locally initiated consumer close.
+                            @Override
+                            public void onSuccess() {
+                                onComplete();
+                                request.onSuccess();
+                            }
 
-                                @Override
-                                public void onFailure(Throwable result) {
-                                    onComplete();
-                                    request.onFailure(result);
-                                }
+                            @Override
+                            public void onFailure(Throwable result) {
+                                onComplete();
+                                request.onFailure(result);
+                            }
 
-                                @Override
-                                public boolean isComplete() {
-                                    return request.isComplete();
-                                }
+                            @Override
+                            public boolean isComplete() {
+                                return request.isComplete();
+                            }
 
-                                void onComplete() {
-                                    // Mark the sessions resources closed, which in turn calls
-                                    // the subscription cleanup.
-                                    session.handleResourceClosure(AmqpProvider.this, null);
-                                }
-                            });
-                        }
+                            void onComplete() {
+                                // Mark the sessions resources closed, which in turn calls
+                                // the subscription cleanup.
+                                session.handleResourceClosure(AmqpProvider.this, null);
+                            }
+                        });
+                    }
 
-                        @Override
-                        public void processProducerInfo(JmsProducerInfo producerInfo) throws Exception {
-                            AmqpSession session = connection.getSession(producerInfo.getParentId());
-                            AmqpProducer producer = session.getProducer(producerInfo);
-                            producer.close(request);
-                        }
+                    @Override
+                    public void processProducerInfo(JmsProducerInfo producerInfo) throws Exception {
+                        AmqpSession session = connection.getSession(producerInfo.getParentId());
+                        AmqpProducer producer = session.getProducer(producerInfo);
+                        producer.close(request);
+                    }
 
-                        @Override
-                        public void processConsumerInfo(final JmsConsumerInfo consumerInfo) throws Exception {
-                            AmqpSession session = connection.getSession(consumerInfo.getParentId());
-                            final AmqpConsumer consumer = session.getConsumer(consumerInfo);
-                            consumer.close(new AsyncResult() {
-                                // TODO: bit of a hack, but works. Similarly above for locally initiated session close.
-                                @Override
-                                public void onSuccess() {
-                                    onComplete();
-                                    request.onSuccess();
-                                }
+                    @Override
+                    public void processConsumerInfo(final JmsConsumerInfo consumerInfo) throws Exception {
+                        AmqpSession session = connection.getSession(consumerInfo.getParentId());
+                        final AmqpConsumer consumer = session.getConsumer(consumerInfo);
+                        consumer.close(new AsyncResult() {
+                            // TODO: bit of a hack, but works. Similarly above for locally initiated session close.
+                            @Override
+                            public void onSuccess() {
+                                onComplete();
+                                request.onSuccess();
+                            }
 
-                                @Override
-                                public void onFailure(Throwable result) {
-                                    onComplete();
-                                    request.onFailure(result);
-                                }
+                            @Override
+                            public void onFailure(Throwable result) {
+                                onComplete();
+                                request.onFailure(result);
+                            }
 
-                                @Override
-                                public boolean isComplete() {
-                                    return request.isComplete();
-                                }
+                            @Override
+                            public boolean isComplete() {
+                                return request.isComplete();
+                            }
 
-                                void onComplete() {
-                                    connection.getSubTracker().consumerRemoved(consumerInfo);
-                                }
-                            });
-                        }
+                            void onComplete() {
+                                connection.getSubTracker().consumerRemoved(consumerInfo);
+                            }
+                        });
+                    }
 
-                        @Override
-                        public void processConnectionInfo(JmsConnectionInfo connectionInfo) throws Exception {
-                            connection.close(request);
-                        }
+                    @Override
+                    public void processConnectionInfo(JmsConnectionInfo connectionInfo) throws Exception {
+                        connection.close(request);
+                    }
 
-                        @Override
-                        public void processDestination(JmsTemporaryDestination destination) throws Exception {
-                            AmqpTemporaryDestination temporary = connection.getTemporaryDestination(destination);
-                            if (temporary != null) {
-                                temporary.close(request);
-                            } else {
-                                LOG.debug("Could not find temporary destination {} to delete.", destination);
-                                request.onSuccess();
-                            }
+                    @Override
+                    public void processDestination(JmsTemporaryDestination destination) throws Exception {
+                        AmqpTemporaryDestination temporary = connection.getTemporaryDestination(destination);
+                        if (temporary != null) {
+                            temporary.close(request);
+                        } else {
+                            LOG.debug("Could not find temporary destination {} to delete.", destination);
+                            request.onSuccess();
                         }
-                    });
+                    }
+                });
 
-                    pumpToProtonTransport(request);
-                } catch (Throwable t) {
-                    request.onFailure(t);
-                }
+                pumpToProtonTransport(request);
+            } catch (Throwable t) {
+                request.onFailure(t);
             }
         });
     }
@@ -623,18 +628,17 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
     @Override
     public void send(final JmsOutboundMessageDispatch envelope, final AsyncResult request) throws IOException {
         checkClosedOrFailed();
-        serializer.execute(new Runnable() {
+        checkConnected();
 
-            @Override
-            public void run() {
-                try {
-                    checkClosedOrFailed();
-                    JmsProducerId producerId = envelope.getProducerId();
-                    AmqpProducer producer = (AmqpProducer) producerId.getProviderHint();
-                    producer.send(envelope, request);
-                } catch (Throwable t) {
-                    request.onFailure(t);
-                }
+        serializer.execute(() -> {
+
+            try {
+                checkClosedOrFailed();
+                JmsProducerId producerId = envelope.getProducerId();
+                AmqpProducer producer = (AmqpProducer) producerId.getProviderHint();
+                producer.send(envelope, request);
+            } catch (Throwable t) {
+                request.onFailure(t);
             }
         });
     }
@@ -642,19 +646,18 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
     @Override
     public void acknowledge(final JmsSessionId sessionId, final ACK_TYPE ackType, final AsyncResult request) throws IOException {
         checkClosedOrFailed();
-        serializer.execute(new Runnable() {
+        checkConnected();
 
-            @Override
-            public void run() {
-                try {
-                    checkClosedOrFailed();
-                    AmqpSession amqpSession = connection.getSession(sessionId);
-                    amqpSession.acknowledge(ackType);
-                    pumpToProtonTransport(request);
-                    request.onSuccess();
-                } catch (Throwable t) {
-                    request.onFailure(t);
-                }
+        serializer.execute(() -> {
+
+            try {
+                checkClosedOrFailed();
+                AmqpSession amqpSession = connection.getSession(sessionId);
+                amqpSession.acknowledge(ackType);
+                pumpToProtonTransport(request);
+                request.onSuccess();
+            } catch (Throwable t) {
+                request.onFailure(t);
             }
         });
     }
@@ -662,29 +665,28 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
     @Override
     public void acknowledge(final JmsInboundMessageDispatch envelope, final ACK_TYPE ackType, final AsyncResult request) throws IOException {
         checkClosedOrFailed();
-        serializer.execute(new Runnable() {
+        checkConnected();
 
-            @Override
-            public void run() {
-                try {
-                    checkClosedOrFailed();
+        serializer.execute(() -> {
 
-                    JmsConsumerId consumerId = envelope.getConsumerId();
-                    AmqpConsumer consumer = (AmqpConsumer) consumerId.getProviderHint();
+            try {
+                checkClosedOrFailed();
 
-                    consumer.acknowledge(envelope, ackType);
+                JmsConsumerId consumerId = envelope.getConsumerId();
+                AmqpConsumer consumer = (AmqpConsumer) consumerId.getProviderHint();
 
-                    if (consumer.getSession().isAsyncAck()) {
-                        request.onSuccess();
-                        pumpToProtonTransport(request);
-                    } else {
-                        pumpToProtonTransport(request, false);
-                        request.onSuccess();
-                        transport.flush();
-                    }
-                } catch (Throwable t) {
-                    request.onFailure(t);
+                consumer.acknowledge(envelope, ackType);
+
+                if (consumer.getSession().isAsyncAck()) {
+                    request.onSuccess();
+                    pumpToProtonTransport(request);
+                } else {
+                    pumpToProtonTransport(request, false);
+                    request.onSuccess();
+                    transport.flush();
                 }
+            } catch (Throwable t) {
+                request.onFailure(t);
             }
         });
     }
@@ -692,18 +694,17 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
     @Override
     public void commit(final JmsTransactionInfo transactionInfo, JmsTransactionInfo nextTransactionId, final AsyncResult request) throws IOException {
         checkClosedOrFailed();
-        serializer.execute(new Runnable() {
+        checkConnected();
 
-            @Override
-            public void run() {
-                try {
-                    checkClosedOrFailed();
-                    AmqpSession session = connection.getSession(transactionInfo.getSessionId());
-                    session.commit(transactionInfo, nextTransactionId, request);
-                    pumpToProtonTransport(request);
-                } catch (Throwable t) {
-                    request.onFailure(t);
-                }
+        serializer.execute(() -> {
+
+            try {
+                checkClosedOrFailed();
+                AmqpSession session = connection.getSession(transactionInfo.getSessionId());
+                session.commit(transactionInfo, nextTransactionId, request);
+                pumpToProtonTransport(request);
+            } catch (Throwable t) {
+                request.onFailure(t);
             }
         });
     }
@@ -711,18 +712,17 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
     @Override
     public void rollback(final JmsTransactionInfo transactionInfo, JmsTransactionInfo nextTransactionId, final AsyncResult request) throws IOException {
         checkClosedOrFailed();
-        serializer.execute(new Runnable() {
+        checkConnected();
 
-            @Override
-            public void run() {
-                try {
-                    checkClosedOrFailed();
-                    AmqpSession session = connection.getSession(transactionInfo.getSessionId());
-                    session.rollback(transactionInfo, nextTransactionId, request);
-                    pumpToProtonTransport(request);
-                } catch (Throwable t) {
-                    request.onFailure(t);
-                }
+        serializer.execute(() -> {
+
+            try {
+                checkClosedOrFailed();
+                AmqpSession session = connection.getSession(transactionInfo.getSessionId());
+                session.rollback(transactionInfo, nextTransactionId, request);
+                pumpToProtonTransport(request);
+            } catch (Throwable t) {
+                request.onFailure(t);
             }
         });
     }
@@ -730,19 +730,18 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
     @Override
     public void recover(final JmsSessionId sessionId, final AsyncResult request) throws IOException {
         checkClosedOrFailed();
-        serializer.execute(new Runnable() {
+        checkConnected();
 
-            @Override
-            public void run() {
-                try {
-                    checkClosedOrFailed();
-                    AmqpSession session = connection.getSession(sessionId);
-                    session.recover();
-                    pumpToProtonTransport(request);
-                    request.onSuccess();
-                } catch (Throwable t) {
-                    request.onFailure(t);
-                }
+        serializer.execute(() -> {
+
+            try {
+                checkClosedOrFailed();
+                AmqpSession session = connection.getSession(sessionId);
+                session.recover();
+                pumpToProtonTransport(request);
+                request.onSuccess();
+            } catch (Throwable t) {
+                request.onFailure(t);
             }
         });
     }
@@ -750,17 +749,16 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
     @Override
     public void unsubscribe(final String subscription, final AsyncResult request) throws IOException {
         checkClosedOrFailed();
-        serializer.execute(new Runnable() {
+        checkConnected();
 
-            @Override
-            public void run() {
-                try {
-                    checkClosedOrFailed();
-                    connection.unsubscribe(subscription, request);
-                    pumpToProtonTransport(request);
-                } catch (Throwable t) {
-                    request.onFailure(t);
-                }
+        serializer.execute(() -> {
+
+            try {
+                checkClosedOrFailed();
+                connection.unsubscribe(subscription, request);
+                pumpToProtonTransport(request);
+            } catch (Throwable t) {
+                request.onFailure(t);
             }
         });
     }
@@ -768,18 +766,17 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
     @Override
     public void pull(final JmsConsumerId consumerId, final long timeout, final AsyncResult request) throws IOException {
         checkClosedOrFailed();
-        serializer.execute(new Runnable() {
+        checkConnected();
 
-            @Override
-            public void run() {
-                try {
-                    checkClosedOrFailed();
-                    AmqpConsumer consumer = (AmqpConsumer) consumerId.getProviderHint();
-                    consumer.pull(timeout, request);
-                    pumpToProtonTransport(request);
-                } catch (Throwable t) {
-                    request.onFailure(t);
-                }
+        serializer.execute(() -> {
+
+            try {
+                checkClosedOrFailed();
+                AmqpConsumer consumer = (AmqpConsumer) consumerId.getProviderHint();
+                consumer.pull(timeout, request);
+                pumpToProtonTransport(request);
+            } catch (Throwable t) {
+                request.onFailure(t);
             }
         });
     }
@@ -792,44 +789,46 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
         }
     }
 
-    @Override
-    public void onData(final ByteBuf input) {
-
-        // We need to retain until the serializer gets around to processing it.
-        input.retain();
-
-        serializer.execute(new Runnable() {
-
-            @Override
-            public void run() {
+    public void scheduleExecuteAndPump(Runnable task) {
+        serializer.execute(() -> {
+            try {
                 try {
-                    if (isTraceBytes()) {
-                        TRACE_BYTES.info("Received: {}", ByteBufUtil.hexDump(input));
-                    }
-
-                    do {
-                        ByteBuffer buffer = protonTransport.tail();
-                        int chunkSize = Math.min(buffer.remaining(), input.readableBytes());
-                        buffer.limit(buffer.position() + chunkSize);
-                        input.readBytes(buffer);
-                        protonTransport.process();
-                    } while (input.isReadable());
-
-                    // Free for pooled memory to be put back now.
-                    input.release();
-
-                    // Process the state changes from the latest data and then answer back
-                    // any pending updates to the Broker.
-                    processUpdates();
+                    task.run();
+                } finally {
                     pumpToProtonTransport();
-                } catch (Throwable t) {
-                    LOG.warn("Caught problem during data processing: {}", t.getMessage(), t);
-                    fireProviderException(t);
                 }
+            } catch (Throwable t) {
+                LOG.warn("Caught problem during task processing: {}", t.getMessage(), t);
+                fireProviderException(t);
             }
         });
     }
 
+    @Override
+    public void onData(final ByteBuf input) {
+        try {
+            if (isTraceBytes()) {
+                TRACE_BYTES.info("Received: {}", ByteBufUtil.hexDump(input));
+            }
+
+            do {
+                ByteBuffer buffer = protonTransport.tail();
+                int chunkSize = Math.min(buffer.remaining(), input.readableBytes());
+                buffer.limit(buffer.position() + chunkSize);
+                input.readBytes(buffer);
+                protonTransport.process();
+            } while (input.isReadable());
+
+            // Process the state changes from the latest data and then answer back
+            // any pending updates to the Broker.
+            processUpdates();
+            pumpToProtonTransport();
+        } catch (Throwable t) {
+            LOG.warn("Caught problem during data processing: {}", t.getMessage(), t);
+            fireProviderException(t);
+        }
+    }
+
     /**
      * Callback method for the Transport to report connection errors.  When called
      * the method will queue a new task to fire the failure error back to the listener.
@@ -840,39 +839,17 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
     @Override
     public void onTransportError(final Throwable error) {
         if (!serializer.isShutdown()) {
-            serializer.execute(new Runnable() {
-                @Override
-                public void run() {
-                    LOG.info("Transport failed: {}", error.getMessage());
-                    if (!closed.get()) {
-                        // We can't send any more output, so close the transport
-                        protonTransport.close_head();
-                        fireProviderException(error);
-                    }
+            serializer.execute(() -> {
+                LOG.info("Transport failed: {}", error.getMessage());
+                if (!closed.get()) {
+                    // We can't send any more output, so close the transport
+                    protonTransport.close_head();
+                    fireProviderException(error);
                 }
             });
         }
     }
 
-    public void scheduleExecuteAndPump(Runnable task) {
-        serializer.execute(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    try {
-                        task.run();
-                    } finally {
-                        pumpToProtonTransport();
-                    }
-                } catch (Throwable t) {
-                    LOG.warn("Caught problem during task processing: {}", t.getMessage(), t);
-
-                    fireProviderException(t);
-                }
-            }
-        });
-    }
-
     /**
      * Callback method for the Transport to report that the underlying connection
      * has closed.  When called this method will queue a new task that will check for
@@ -882,15 +859,12 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
     @Override
     public void onTransportClosed() {
         if (!serializer.isShutdown()) {
-            serializer.execute(new Runnable() {
-                @Override
-                public void run() {
-                    LOG.debug("Transport connection remotely closed");
-                    if (!closed.get()) {
-                        // We can't send any more output, so close the transport
-                        protonTransport.close_head();
-                        fireProviderException(new IOException("Transport connection remotely closed."));
-                    }
+            serializer.execute(() -> {
+                LOG.debug("Transport connection remotely closed");
+                if (!closed.get()) {
+                    // We can't send any more output, so close the transport
+                    protonTransport.close_head();
+                    fireProviderException(new IOException("Transport connection remotely closed."));
                 }
             });
         }
@@ -1412,14 +1386,9 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
      */
     public ScheduledFuture<?> scheduleRequestTimeout(final AsyncResult request, long timeout, final Exception error) {
         if (timeout != JmsConnectionInfo.INFINITE) {
-            return serializer.schedule(new Runnable() {
-
-                @Override
-                public void run() {
-                    request.onFailure(error);
-                    pumpToProtonTransport();
-                }
-
+            return serializer.schedule(() -> {
+                request.onFailure(error);
+                pumpToProtonTransport();
             }, timeout, TimeUnit.MILLISECONDS);
         }
 
@@ -1442,14 +1411,9 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
      */
     public ScheduledFuture<?> scheduleRequestTimeout(final AsyncResult request, long timeout, final AmqpExceptionBuilder builder) {
         if (timeout != JmsConnectionInfo.INFINITE) {
-            return serializer.schedule(new Runnable() {
-
-                @Override
-                public void run() {
-                    request.onFailure(builder.createException());
-                    pumpToProtonTransport();
-                }
-
+            return serializer.schedule(() -> {
+                request.onFailure(builder.createException());
+                pumpToProtonTransport();
             }, timeout, TimeUnit.MILLISECONDS);
         }
 
@@ -1468,8 +1432,13 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
         }
     }
 
-    private Mechanism findSaslMechanism(String[] remoteMechanisms) throws JMSSecurityRuntimeException {
+    private void checkConnected() throws ProviderClosedException, ProviderFailedException {
+        if (serializer == null) {
+            throw new ProviderException("Transport has not been properly connected.");
+        }
+    }
 
+    private Mechanism findSaslMechanism(String[] remoteMechanisms) throws JMSSecurityRuntimeException {
         final String username;
         if (connectionInfo.getExtensionMap().containsKey(JmsConnectionExtensions.USERNAME_OVERRIDE)) {
             username = (String) connectionInfo.getExtensionMap().get(

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4314482d/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java
index 5c92b06..d9c50ed 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java
@@ -19,6 +19,8 @@ package org.apache.qpid.jms.transports;
 import java.io.IOException;
 import java.net.URI;
 import java.security.Principal;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
 
 import javax.net.ssl.SSLContext;
 
@@ -33,13 +35,19 @@ public interface Transport {
      * Performs the connect operation for the implemented Transport type
      * such as a TCP socket connection, SSL/TLS handshake etc.
      *
+     * @param initRoutine
+     * 			a runnable initialization method that is executed in the context
+     *          of the transport's IO thread to allow thread safe setup of resources
+     *          that will be run from the transport executor service.
      * @param sslContextOverride
      *          a user-provided SSLContext to use if establishing a secure
      *          connection, overrides applicable URI configuration
      *
+     * @return A ScheduledThreadPoolExecutor that can run work on the Transport IO thread.
+     *
      * @throws IOException if an error occurs while attempting the connect.
      */
-    void connect(SSLContext sslContextOverride) throws IOException;
+    ScheduledExecutorService connect(Runnable initRoutine, SSLContext sslContextOverride) throws IOException;
 
     /**
      * @return true if transport is connected or false if the connection is down.
@@ -119,6 +127,22 @@ public interface Transport {
     void setTransportListener(TransportListener listener);
 
     /**
+     * @return the {@link ThreadFactory} used to create the IO thread for this Transport
+     */
+    ThreadFactory getThreadFactory();
+
+    /**
+     * Sets the {@link ThreadFactory} that the Transport should use when creating the Transport
+     * IO thread for processing.
+     *
+     * @param factory
+     * 		The {@link ThreadFactory}
+     *
+     * @throws IllegalStateException if called after a call to {@link #connect(Runnable, SSLContext)}
+     */
+    void setThreadFactory(ThreadFactory factory);
+
+    /**
      * @return the TransportOptions instance that holds the configuration for this Transport.
      */
     TransportOptions getTransportOptions();

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4314482d/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
index 5df7899..e9c66b7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
@@ -20,6 +20,8 @@ import java.io.IOException;
 import java.net.URI;
 import java.security.Principal;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -74,6 +76,7 @@ public class NettyTcpTransport implements Transport {
     protected EventLoopGroup group;
     protected Channel channel;
     protected TransportListener listener;
+    protected ThreadFactory ioThreadfactory;
     protected int maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
 
     private final boolean secure;
@@ -126,26 +129,27 @@ public class NettyTcpTransport implements Transport {
     }
 
     @Override
-    public void connect(SSLContext sslContextOverride) throws IOException {
+    public ScheduledExecutorService connect(final Runnable initRoutine, SSLContext sslContextOverride) throws IOException {
+        if (closed.get()) {
+            throw new IllegalStateException("Transport has already been closed");
+        }
 
         if (listener == null) {
             throw new IllegalStateException("A transport listener must be set before connection attempts.");
         }
 
-        getTransportOptions().setSslContextOverride(sslContextOverride);
-
         boolean useKQueue = getTransportOptions().isUseKQueue() && KQueue.isAvailable();
         boolean useEpoll = getTransportOptions().isUseEpoll() && Epoll.isAvailable();
 
         if (useKQueue) {
             LOG.trace("Netty Transport using KQueue mode");
-            group = new KQueueEventLoopGroup(1);
+            group = new KQueueEventLoopGroup(1, ioThreadfactory);
         } else if (useEpoll) {
             LOG.trace("Netty Transport using Epoll mode");
-            group = new EpollEventLoopGroup(1);
+            group = new EpollEventLoopGroup(1, ioThreadfactory);
         } else {
             LOG.trace("Netty Transport using NIO mode");
-            group = new NioEventLoopGroup(1);
+            group = new NioEventLoopGroup(1, ioThreadfactory);
         }
 
         bootstrap = new Bootstrap();
@@ -160,11 +164,19 @@ public class NettyTcpTransport implements Transport {
         bootstrap.handler(new ChannelInitializer<Channel>() {
             @Override
             public void initChannel(Channel connectedChannel) throws Exception {
+                if (initRoutine != null) {
+                    try {
+                        initRoutine.run();
+                    } catch (Throwable initError) {
+                        failureCause = IOExceptionSupport.create(initError);
+                    }
+                }
                 configureChannel(connectedChannel);
             }
         });
 
         configureNetty(bootstrap, getTransportOptions());
+        getTransportOptions().setSslContextOverride(sslContextOverride);
 
         ChannelFuture future = bootstrap.connect(getRemoteHost(), getRemotePort());
         future.addListener(new ChannelFutureListener() {
@@ -202,16 +214,14 @@ public class NettyTcpTransport implements Transport {
             throw failureCause;
         } else {
             // Connected, allow any held async error to fire now and close the transport.
-            channel.eventLoop().execute(new Runnable() {
-
-                @Override
-                public void run() {
-                    if (failureCause != null) {
-                        channel.pipeline().fireExceptionCaught(failureCause);
-                    }
+            channel.eventLoop().execute(() -> {
+                if (failureCause != null) {
+                    channel.pipeline().fireExceptionCaught(failureCause);
                 }
             });
         }
+
+        return group;
     }
 
     @Override
@@ -316,6 +326,20 @@ public class NettyTcpTransport implements Transport {
         return maxFrameSize;
     }
 
+    @Override
+    public ThreadFactory getThreadFactory() {
+        return ioThreadfactory;
+    }
+
+    @Override
+    public void setThreadFactory(ThreadFactory factory) {
+        if (isConnected() || channel != null) {
+            throw new IllegalStateException("Cannot set IO ThreadFactory after Transport connect");
+        }
+
+        this.ioThreadfactory = factory;
+    }
+
     //----- Internal implementation details, can be overridden as needed -----//
 
     protected String getRemoteHost() {
@@ -349,7 +373,13 @@ public class NettyTcpTransport implements Transport {
         LOG.trace("Channel has gone inactive! Channel is {}", channel);
         if (connected.compareAndSet(true, false) && !closed.get()) {
             LOG.trace("Firing onTransportClosed listener");
-            listener.onTransportClosed();
+            if (channel.eventLoop().inEventLoop()) {
+                listener.onTransportClosed();
+            } else {
+                channel.eventLoop().execute(() -> {
+                    listener.onTransportClosed();
+                });
+            }
         }
     }
 
@@ -357,10 +387,20 @@ public class NettyTcpTransport implements Transport {
         LOG.trace("Exception on channel! Channel is {}", channel);
         if (connected.compareAndSet(true, false) && !closed.get()) {
             LOG.trace("Firing onTransportError listener");
-            if (failureCause != null) {
-                listener.onTransportError(failureCause);
+            if (channel.eventLoop().inEventLoop()) {
+                if (failureCause != null) {
+                    listener.onTransportError(failureCause);
+                } else {
+                    listener.onTransportError(cause);
+                }
             } else {
-                listener.onTransportError(cause);
+                channel.eventLoop().execute(() -> {
+                    if (failureCause != null) {
+                        listener.onTransportError(failureCause);
+                    } else {
+                        listener.onTransportError(cause);
+                    }
+                });
             }
         } else {
             // Hold the first failure for later dispatch if connect succeeds.
@@ -498,8 +538,15 @@ public class NettyTcpTransport implements Transport {
 
         @Override
         protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
-            LOG.trace("New data read: {} bytes incoming: {}", buffer.readableBytes(), buffer);
-            listener.onData(buffer);
+            LOG.trace("New data read: {} bytes incomsing: {}", buffer.readableBytes(), buffer);
+            // Avoid all doubts to the contrary
+            if (channel.eventLoop().inEventLoop()) {
+                listener.onData(buffer);
+            } else {
+                channel.eventLoop().execute(() -> {
+                    listener.onData(buffer);
+                });
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4314482d/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
index 53f3c6a..bcafa45 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.security.ProviderException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -377,27 +378,32 @@ public class AmqpProviderTest extends QpidJmsTestCase {
 
     @Test(timeout = 20000)
     public void testErrorDuringCreateResourceFailsRequest() throws Exception {
-        doErrorDuringOperationFailsRequesTTestImpl(Op.CREATE);
+        doErrorDuringOperationFailsRequestTestImpl(Op.CREATE);
     }
 
     @Test(timeout = 20000)
     public void testErrorDuringStartResourceFailsRequest() throws Exception {
-        doErrorDuringOperationFailsRequesTTestImpl(Op.START);
+        doErrorDuringOperationFailsRequestTestImpl(Op.START);
     }
 
     @Test(timeout = 20000)
     public void testErrorDuringStopResourceFailsRequest() throws Exception {
-        doErrorDuringOperationFailsRequesTTestImpl(Op.STOP);
+        doErrorDuringOperationFailsRequestTestImpl(Op.STOP);
     }
 
     @Test(timeout = 20000)
     public void testErrorDuringDestroyResourceFailsRequest() throws Exception {
-        doErrorDuringOperationFailsRequesTTestImpl(Op.DESTROY);
+        doErrorDuringOperationFailsRequestTestImpl(Op.DESTROY);
     }
 
-    private void doErrorDuringOperationFailsRequesTTestImpl(Op operation) throws Exception {
+    private void doErrorDuringOperationFailsRequestTestImpl(Op operation) throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer()) {
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen();
+            testPeer.expectClose();
+
             provider = new AmqpProviderFactory().createProvider(getPeerURI(testPeer));
+            provider.connect(connectionInfo);
 
             final AtomicBoolean errorThrown = new AtomicBoolean();
             JmsResource resourceInfo = new JmsAbstractResource() {
@@ -449,6 +455,94 @@ public class AmqpProviderTest extends QpidJmsTestCase {
         }
     }
 
+    @Test(timeout = 20000)
+    public void testCreateResourceFailsWhenNoConnectCalled() throws Exception {
+        doErrorDuringOperationFailsWhenNoConnectCalledTestImpl(Op.CREATE);
+    }
+
+    @Test(timeout = 20000)
+    public void testStartResourceFailsWhenNoConnectCalled() throws Exception {
+        doErrorDuringOperationFailsWhenNoConnectCalledTestImpl(Op.START);
+    }
+
+    @Test(timeout = 20000)
+    public void testStopResourceFailsWhenNoConnectCalled() throws Exception {
+        doErrorDuringOperationFailsWhenNoConnectCalledTestImpl(Op.STOP);
+    }
+
+    @Test(timeout = 20000)
+    public void testDestroyResourceFailsWhenNoConnectCalled() throws Exception {
+        doErrorDuringOperationFailsWhenNoConnectCalledTestImpl(Op.DESTROY);
+    }
+
+    private void doErrorDuringOperationFailsWhenNoConnectCalledTestImpl(Op operation) throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer()) {
+
+            provider = new AmqpProviderFactory().createProvider(getPeerURI(testPeer));
+
+            final AtomicBoolean errorThrown = new AtomicBoolean();
+            JmsResource resourceInfo = new JmsAbstractResource() {
+                @Override
+                public void visit(JmsResourceVistor visitor) {
+                    errorThrown.set(true);
+                    throw new Error("Deliberate error for testing");
+                }
+
+                @Override
+                public JmsResourceId getId() {
+                    return new JmsAbstractResourceId() {
+                    };
+                }
+            };
+
+            assertFalse("Error should not have been thrown", errorThrown.get());
+            ProviderFuture request = provider.newProviderFuture();
+
+            switch(operation) {
+            case CREATE:
+                try {
+                    provider.create(resourceInfo, request);
+                    fail("Request should have failed");
+                } catch (ProviderException e) {
+                    // Expected
+                }
+                break;
+            case START:
+                try {
+                    provider.start(resourceInfo, request);
+                    fail("Request should have failed");
+                } catch (ProviderException e) {
+                    // Expected
+                }
+                break;
+            case STOP:
+                try {
+                    provider.stop(resourceInfo, request);
+                    fail("Request should have failed");
+                } catch (ProviderException e) {
+                    // Expected
+                }
+                break;
+            case DESTROY:
+                try {
+                    provider.destroy(resourceInfo, request);
+                    fail("Request should have failed");
+                } catch (ProviderException e) {
+                    // Expected
+                }
+                break;
+            default:
+                throw new IllegalArgumentException("Unexpected operation given");
+            }
+
+            assertFalse("Error should not have been thrown", errorThrown.get());
+
+            provider.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
     private JmsConnectionInfo createConnectionInfo() {
         JmsConnectionId connectionId = new JmsConnectionId(connectionIdGenerator.generateId());
         JmsConnectionInfo connectionInfo = new JmsConnectionInfo(connectionId);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4314482d/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyOpenSslTransportTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyOpenSslTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyOpenSslTransportTest.java
index 3c78f16..b8ef700 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyOpenSslTransportTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyOpenSslTransportTest.java
@@ -77,7 +77,7 @@ public class NettyOpenSslTransportTest extends NettySslTransportTest {
 
             Transport transport = createTransport(serverLocation, testListener, options);
             try {
-                transport.connect(null);
+                transport.connect(null, null);
                 LOG.info("Connected to server:{} as expected.", serverLocation);
             } catch (Exception e) {
                 fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
@@ -125,7 +125,7 @@ public class NettyOpenSslTransportTest extends NettySslTransportTest {
 
             Transport transport = createTransport(serverLocation, testListener, options);
             try {
-                transport.connect(sslContext);
+                transport.connect(null, sslContext);
                 LOG.info("Connected to server:{} as expected.", serverLocation);
             } catch (Exception e) {
                 fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4314482d/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySslTransportTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySslTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySslTransportTest.java
index f5d71be..9b4f47a 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySslTransportTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySslTransportTest.java
@@ -84,7 +84,7 @@ public class NettySslTransportTest extends NettyTcpTransportTest {
 
             Transport transport = createTransport(serverLocation, testListener, createClientOptionsWithoutTrustStore(false));
             try {
-                transport.connect(null);
+                transport.connect(null, null);
                 fail("Should not have connected to the server: " + serverLocation);
             } catch (Exception e) {
                 LOG.info("Connection failed to untrusted test server: {}", serverLocation);
@@ -116,7 +116,7 @@ public class NettySslTransportTest extends NettyTcpTransportTest {
 
             Transport transport = createTransport(serverLocation, testListener, options);
             try {
-                transport.connect(null);
+                transport.connect(null, null);
                 fail("Should not have connected to the server: " + serverLocation);
             } catch (Exception e) {
                 LOG.info("Connection failed to untrusted test server: {}", serverLocation);
@@ -138,7 +138,7 @@ public class NettySslTransportTest extends NettyTcpTransportTest {
 
             Transport transport = createTransport(serverLocation, testListener, createClientOptionsWithoutTrustStore(true));
             try {
-                transport.connect(null);
+                transport.connect(null, null);
                 LOG.info("Connection established to untrusted test server: {}", serverLocation);
             } catch (Exception e) {
                 fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
@@ -168,7 +168,7 @@ public class NettySslTransportTest extends NettyTcpTransportTest {
 
             NettyTcpTransport transport = createTransport(serverLocation, testListener, clientOptions);
             try {
-                transport.connect(null);
+                transport.connect(null, null);
                 LOG.info("Connection established to test server: {}", serverLocation);
             } catch (Exception e) {
                 fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
@@ -208,7 +208,7 @@ public class NettySslTransportTest extends NettyTcpTransportTest {
 
             NettyTcpTransport transport = createTransport(serverLocation, testListener, clientOptions);
             try {
-                transport.connect(null);
+                transport.connect(null, null);
                 LOG.info("Connection established to test server: {}", serverLocation);
             } catch (Exception e) {
                 fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
@@ -262,7 +262,7 @@ public class NettySslTransportTest extends NettyTcpTransportTest {
 
             Transport transport = createTransport(serverLocation, testListener, clientOptions);
             try {
-                transport.connect(null);
+                transport.connect(null, null);
                 if (verifyHost) {
                     fail("Should not have connected to the server: " + serverLocation);
                 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org