You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2015/01/08 22:44:40 UTC

[1/2] qpid-jms git commit: Heavy refactoring of the Transport layer in the client.

Repository: qpid-jms
Updated Branches:
  refs/heads/master 24bd2a919 -> abde5ef20


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransport.java
new file mode 100644
index 0000000..bb996f5
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransport.java
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.plain;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.net.SocketFactory;
+
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.transports.Transport;
+import org.apache.qpid.jms.transports.TransportListener;
+import org.apache.qpid.jms.util.IOExceptionSupport;
+import org.apache.qpid.jms.util.InetAddressUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class PlainTcpTransport implements Transport, Runnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PlainTcpTransport.class);
+
+    private TransportListener listener;
+    private final URI remoteLocation;
+    private final AtomicBoolean connected = new AtomicBoolean();
+    private final AtomicBoolean closed = new AtomicBoolean();
+    private final AtomicReference<Throwable> connectionError = new AtomicReference<Throwable>();
+
+    private final Socket socket;
+    private DataOutputStream dataOut;
+    private DataInputStream dataIn;
+    private Thread runner;
+
+    private TransportOptions options;
+
+    private boolean closeAsync = true;
+    private boolean useLocalHost = false;
+    private int ioBufferSize = 8 * 1024;
+
+    /**
+     * Create a new transport instance
+     *
+     * @param remoteLocation
+     *        the URI that defines the remote resource to connect to.
+     * @param options
+     *        the transport options used to configure the socket connection.
+     */
+    public PlainTcpTransport(URI remoteLocation, TransportOptions options) {
+        this(null, remoteLocation, options);
+    }
+
+    /**
+     * Create a new transport instance
+     *
+     * @param listener
+     *        the TransportListener that will receive events from this Transport.
+     * @param remoteLocation
+     *        the URI that defines the remote resource to connect to.
+     * @param options
+     *        the transport options used to configure the socket connection.
+     */
+    public PlainTcpTransport(TransportListener listener, URI remoteLocation, TransportOptions options) {
+        this.options = options;
+        this.listener = listener;
+        this.remoteLocation = remoteLocation;
+
+        Socket temp = null;
+        try {
+            temp = createSocketFactory().createSocket();
+        } catch (IOException e) {
+            connectionError.set(e);
+        }
+
+        this.socket = temp;
+    }
+
+    @Override
+    public void connect() throws IOException {
+        if (connectionError.get() != null) {
+            throw IOExceptionSupport.create(connectionError.get());
+        }
+
+        if (socket == null) {
+            throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
+        }
+
+        InetSocketAddress remoteAddress = null;
+
+        if (remoteLocation != null) {
+            String host = resolveHostName(remoteLocation.getHost());
+            remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
+        }
+
+        socket.connect(remoteAddress);
+
+        connected.set(true);
+
+        initialiseSocket(socket);
+        initializeStreams();
+
+        runner = new Thread(null, this, "QpidJMS " + getClass().getSimpleName() + ": " + toString());
+        runner.setDaemon(false);
+        runner.start();
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (closed.compareAndSet(false, true)) {
+            if (socket == null) {
+                return;
+            }
+
+            // Closing the streams flush the sockets before closing.. if the socket
+            // is hung.. then this hangs the close so we support an asynchronous close
+            // by default which will timeout if the close doesn't happen after a delay.
+            if (closeAsync) {
+                final CountDownLatch latch = new CountDownLatch(1);
+
+                final ExecutorService closer = Executors.newSingleThreadExecutor();
+                closer.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        LOG.trace("Closing socket {}", socket);
+                        try {
+                            socket.close();
+                            LOG.debug("Closed socket {}", socket);
+                        } catch (IOException e) {
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("Caught exception closing socket " + socket + ". This exception will be ignored.", e);
+                            }
+                        } finally {
+                            latch.countDown();
+                        }
+                    }
+                });
+
+                try {
+                    latch.await(1,TimeUnit.SECONDS);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                } finally {
+                    closer.shutdownNow();
+                }
+            } else {
+                LOG.trace("Closing socket {}", socket);
+                try {
+                    socket.close();
+                    LOG.debug("Closed socket {}", socket);
+                } catch (IOException e) {
+                    LOG.debug("Caught exception closing socket {}. This exception will be ignored.", socket, e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void send(ByteBuffer output) throws IOException {
+        checkConnected();
+        LOG.info("RawTcpTransport sending packet of size: {}", output.remaining());
+        WritableByteChannel channel = Channels.newChannel(dataOut);
+        channel.write(output);
+        dataOut.flush();
+    }
+
+    @Override
+    public void send(ByteBuf output) throws IOException {
+        checkConnected();
+        send(output.nioBuffer());
+    }
+
+    @Override
+    public boolean isConnected() {
+        return this.connected.get();
+    }
+
+    @Override
+    public TransportListener getTransportListener() {
+        return this.listener;
+    }
+
+    @Override
+    public void setTransportListener(TransportListener listener) {
+        if (listener == null) {
+            throw new IllegalArgumentException("Listener cannot be set to null");
+        }
+
+        this.listener = listener;
+    }
+
+    public TransportOptions getTransportOptions() {
+        if (options == null) {
+            options = TransportOptions.DEFAULT_OPTIONS;
+        }
+
+        return options;
+    }
+
+    public boolean isUseLocalHost() {
+        return useLocalHost;
+    }
+
+    public void setUseLocalHost(boolean useLocalHost) {
+        this.useLocalHost = useLocalHost;
+    }
+
+    public int getIoBufferSize() {
+        return ioBufferSize;
+    }
+
+    public void setIoBufferSize(int ioBufferSize) {
+        this.ioBufferSize = ioBufferSize;
+    }
+
+    public boolean isCloseAsync() {
+        return closeAsync;
+    }
+
+    public void setCloseAsync(boolean closeAsync) {
+        this.closeAsync = closeAsync;
+    }
+
+    //---------- Transport internal implementation ---------------------------//
+
+    @Override
+    public void run() {
+        LOG.trace("TCP consumer thread for " + this + " starting");
+        try {
+            while (isConnected()) {
+                doRun();
+            }
+        } catch (IOException e) {
+            connectionError.set(e);
+            onException(e);
+        } catch (Throwable e) {
+            IOException ioe = new IOException("Unexpected error occured: " + e);
+            connectionError.set(ioe);
+            ioe.initCause(e);
+            onException(ioe);
+        }
+    }
+
+    protected void doRun() throws IOException {
+        int size = dataIn.available();
+        if (size <= 0) {
+            try {
+                TimeUnit.NANOSECONDS.sleep(1);
+            } catch (InterruptedException e) {
+            }
+            return;
+        }
+
+        byte[] buffer = new byte[size];
+        dataIn.readFully(buffer);
+        listener.onData(Unpooled.wrappedBuffer(buffer));
+    }
+
+    /**
+     * Passes any IO exceptions into the transport listener
+     */
+    public void onException(IOException e) {
+        if (listener != null) {
+            try {
+                listener.onTransportError(e);
+            } catch (RuntimeException e2) {
+                LOG.debug("Unexpected runtime exception: " + e2, e2);
+            }
+        }
+    }
+
+    protected SocketFactory createSocketFactory() throws IOException {
+        return SocketFactory.getDefault();
+    }
+
+    protected void initialiseSocket(Socket sock) throws SocketException, IllegalArgumentException {
+        TransportOptions options = getTransportOptions();
+
+        try {
+            sock.setReceiveBufferSize(options.getReceiveBufferSize());
+        } catch (SocketException se) {
+            LOG.warn("Cannot set socket receive buffer size = {}", options.getReceiveBufferSize());
+            LOG.debug("Cannot set socket receive buffer size. Reason: {}. This exception is ignored.", se.getMessage(), se);
+        }
+
+        try {
+            sock.setSendBufferSize(options.getSendBufferSize());
+        } catch (SocketException se) {
+            LOG.warn("Cannot set socket send buffer size = {}", options.getSendBufferSize());
+            LOG.debug("Cannot set socket send buffer size. Reason: {}. This exception is ignored.", se.getMessage(), se);
+        }
+
+        sock.setSoTimeout(options.getSoTimeout());
+        sock.setKeepAlive(options.isTcpKeepAlive());
+        sock.setTcpNoDelay(options.isTcpNoDelay());
+
+        if (options.getSoLinger() > 0) {
+            sock.setSoLinger(true, options.getSoLinger());
+        } else {
+            sock.setSoLinger(false, 0);
+        }
+    }
+
+    protected void initializeStreams() throws IOException {
+        try {
+            TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize);
+            this.dataIn = new DataInputStream(buffIn);
+            TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
+            this.dataOut = new DataOutputStream(outputStream);
+        } catch (Throwable e) {
+            throw IOExceptionSupport.create(e);
+        }
+    }
+
+    protected String resolveHostName(String host) throws UnknownHostException {
+        if (isUseLocalHost()) {
+            String localName = InetAddressUtil.getLocalHostName();
+            if (localName != null && localName.equals(host)) {
+                return "localhost";
+            }
+        }
+        return host;
+    }
+
+    private void checkConnected() throws IOException {
+        if (!connected.get()) {
+            throw new IOException("Cannot send to a non-connected transport.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransportFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransportFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransportFactory.java
new file mode 100644
index 0000000..3894408
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransportFactory.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.plain;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.transports.Transport;
+import org.apache.qpid.jms.transports.TransportFactory;
+import org.apache.qpid.jms.util.PropertyUtil;
+
+/**
+ * Factory for creating the Plain TCP transport.
+ */
+public class PlainTcpTransportFactory extends TransportFactory {
+
+    @Override
+    public Transport createTransport(URI remoteURI) throws Exception {
+
+        Map<String, String> map = PropertyUtil.parseQuery(remoteURI.getQuery());
+        Map<String, String> transportURIOptions = PropertyUtil.filterProperties(map, "transport.");
+
+        remoteURI = PropertyUtil.replaceQuery(remoteURI, map);
+
+        TransportOptions transportOptions = new TransportOptions();
+
+        if (!PropertyUtil.setProperties(transportOptions, transportURIOptions)) {
+            String msg = " Not all transport options could be set on the Transport." +
+                         " Check the options are spelled correctly." +
+                         " Given parameters=[" + transportURIOptions + "]." +
+                         " This provider instance cannot be started.";
+            throw new IllegalArgumentException(msg);
+        }
+
+        Transport result = doCreateTransport(remoteURI, transportOptions);
+
+        return result;
+    }
+
+    protected PlainTcpTransport doCreateTransport(URI remoteURI, TransportOptions transportOptions) throws Exception {
+        return new PlainTcpTransport(remoteURI, transportOptions);
+    }
+
+    @Override
+    public String getName() {
+        return "TCP";
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/TcpBufferedInputStream.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/TcpBufferedInputStream.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/TcpBufferedInputStream.java
new file mode 100644
index 0000000..ee4bf5c
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/TcpBufferedInputStream.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.plain;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * An optimized buffered input stream for Tcp
+ */
+public class TcpBufferedInputStream extends FilterInputStream {
+
+    private static final int DEFAULT_BUFFER_SIZE = 8192;
+    protected byte internalBuffer[];
+    protected int count;
+    protected int position;
+
+    public TcpBufferedInputStream(InputStream in) {
+        this(in, DEFAULT_BUFFER_SIZE);
+    }
+
+    public TcpBufferedInputStream(InputStream in, int size) {
+        super(in);
+        if (size <= 0) {
+            throw new IllegalArgumentException("Buffer size <= 0");
+        }
+        internalBuffer = new byte[size];
+    }
+
+    protected void fill() throws IOException {
+        byte[] buffer = internalBuffer;
+        count = 0;
+        position = 0;
+        int n = in.read(buffer, position, buffer.length - position);
+        if (n > 0) {
+            count = n + position;
+        }
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (position >= count) {
+            fill();
+            if (position >= count) {
+                return -1;
+            }
+        }
+        return internalBuffer[position++] & 0xff;
+    }
+
+    private int readStream(byte[] b, int off, int len) throws IOException {
+        int avail = count - position;
+        if (avail <= 0) {
+            if (len >= internalBuffer.length) {
+                return in.read(b, off, len);
+            }
+            fill();
+            avail = count - position;
+            if (avail <= 0) {
+                return -1;
+            }
+        }
+        int cnt = (avail < len) ? avail : len;
+        System.arraycopy(internalBuffer, position, b, off, cnt);
+        position += cnt;
+        return cnt;
+    }
+
+    @Override
+    public int read(byte b[], int off, int len) throws IOException {
+        if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
+            throw new IndexOutOfBoundsException();
+        } else if (len == 0) {
+            return 0;
+        }
+        int n = 0;
+        for (;;) {
+            int nread = readStream(b, off + n, len - n);
+            if (nread <= 0) {
+                return (n == 0) ? nread : n;
+            }
+            n += nread;
+            if (n >= len) {
+                return n;
+            }
+            // if not closed but no bytes available, return
+            InputStream input = in;
+            if (input != null && input.available() <= 0) {
+                return n;
+            }
+        }
+    }
+
+    @Override
+    public long skip(long n) throws IOException {
+        if (n <= 0) {
+            return 0;
+        }
+        long avail = count - position;
+        if (avail <= 0) {
+            return in.skip(n);
+        }
+        long skipped = (avail < n) ? avail : n;
+        position += skipped;
+        return skipped;
+    }
+
+    @Override
+    public int available() throws IOException {
+        return in.available() + (count - position);
+    }
+
+    @Override
+    public boolean markSupported() {
+        return false;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (in != null) {
+            in.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/TcpBufferedOutputStream.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/TcpBufferedOutputStream.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/TcpBufferedOutputStream.java
new file mode 100644
index 0000000..84359ae
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/TcpBufferedOutputStream.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.plain;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * An optimized buffered outputstream for Tcp
+ */
+public class TcpBufferedOutputStream extends FilterOutputStream {
+
+    private static final int BUFFER_SIZE = 8192;
+    private final byte[] buffer;
+    private final int bufferlen;
+    private int count;
+
+    /**
+     * Constructor
+     *
+     * @param out
+     */
+    public TcpBufferedOutputStream(OutputStream out) {
+        this(out, BUFFER_SIZE);
+    }
+
+    /**
+     * Creates a new buffered output stream to write data to the specified underlying output
+     * stream with the specified buffer size.
+     *
+     * @param out
+     *        the underlying output stream.
+     * @param size
+     *        the buffer size.
+     * @throws IllegalArgumentException
+     *         if size <= 0.
+     */
+    public TcpBufferedOutputStream(OutputStream out, int size) {
+        super(out);
+        if (size <= 0) {
+            throw new IllegalArgumentException("Buffer size <= 0");
+        }
+        buffer = new byte[size];
+        bufferlen = size;
+    }
+
+    /**
+     * write a byte on to the stream
+     *
+     * @param b
+     *        - byte to write
+     * @throws IOException
+     */
+    @Override
+    public void write(int b) throws IOException {
+        if ((bufferlen - count) < 1) {
+            flush();
+        }
+        buffer[count++] = (byte) b;
+    }
+
+    /**
+     * write a byte array to the stream
+     *
+     * @param b
+     *        the byte buffer
+     * @param off
+     *        the offset into the buffer
+     * @param len
+     *        the length of data to write
+     * @throws IOException
+     */
+    @Override
+    public void write(byte b[], int off, int len) throws IOException {
+        if (b != null) {
+            if ((bufferlen - count) < len) {
+                flush();
+            }
+            if (buffer.length >= len) {
+                System.arraycopy(b, off, buffer, count, len);
+                count += len;
+            } else {
+                out.write(b, off, len);
+            }
+        }
+    }
+
+    /**
+     * flush the data to the output stream This doesn't call flush on the underlying
+     * outputstream, because Tcp is particularly efficent at doing this itself ....
+     *
+     * @throws IOException
+     */
+    @Override
+    public void flush() throws IOException {
+        if (count > 0 && out != null) {
+            out.write(buffer, 0, count);
+            count = 0;
+        }
+    }
+
+    /**
+     * close this stream
+     *
+     * @throws IOException
+     */
+    @Override
+    public void close() throws IOException {
+        super.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/SslTransport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/SslTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/SslTransport.java
new file mode 100644
index 0000000..fd312f8
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/SslTransport.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.vertx;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.qpid.jms.JmsSslContext;
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.transports.TransportListener;
+import org.vertx.java.core.net.NetClient;
+
+/**
+ * Provides SSL configuration to the Vert.x NetClient object used by the underling
+ * TCP based Transport.
+ */
+public class SslTransport extends TcpTransport {
+
+    // TODO - remove with SSL configuration placed in Transport options.
+    private JmsSslContext context;
+
+    /**
+     * Create a new transport instance
+     *
+     * @param remoteLocation
+     *        the URI that defines the remote resource to connect to.
+     * @param options
+     *        the transport options used to configure the socket connection.
+     */
+    public SslTransport(URI remoteLocation, TransportOptions options) {
+        super(null, remoteLocation, options);
+    }
+
+    /**
+     * Create a new transport instance
+     *
+     * @param listener
+     *        the TransportListener that will receive events from this Transport.
+     * @param remoteLocation
+     *        the URI that defines the remote resource to connect to.
+     * @param options
+     *        the transport options used to configure the socket connection.
+     */
+    public SslTransport(TransportListener listener, URI remoteLocation, TransportOptions options) {
+        super(listener, remoteLocation, options);
+    }
+
+    @Override
+    protected void configureNetClient(NetClient client, TransportOptions options) throws IOException {
+        super.configureNetClient(client, options);
+
+        client.setSSL(true);
+        client.setKeyStorePath(context.getKeyStoreLocation());
+        client.setKeyStorePassword(context.getKeyStorePassword());
+        client.setTrustStorePath(context.getTrustStoreLocation());
+        client.setTrustStorePassword(context.getTrustStorePassword());
+    }
+
+    /**
+     * @return the context
+     */
+    public JmsSslContext getContext() {
+        return context;
+    }
+
+    /**
+     * @param context the context to set
+     */
+    public void setContext(JmsSslContext context) {
+        this.context = context;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/SslTransportFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/SslTransportFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/SslTransportFactory.java
new file mode 100644
index 0000000..b7c738c
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/SslTransportFactory.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.vertx;
+
+import java.net.URI;
+
+import org.apache.qpid.jms.JmsSslContext;
+import org.apache.qpid.jms.transports.TransportOptions;
+
+/**
+ * Create an SslTransport instance.
+ */
+public class SslTransportFactory extends TcpTransportFactory {
+
+    @Override
+    protected TcpTransport doCreateTransport(URI remoteURI, TransportOptions transportOptions) throws Exception {
+        SslTransport transport = new SslTransport(remoteURI, transportOptions);
+
+        transport.setContext(JmsSslContext.getCurrentSslContext());
+
+        return transport;
+    }
+
+    @Override
+    public String getName() {
+        return "SSL";
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransport.java
new file mode 100644
index 0000000..e824ec4
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransport.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.vertx;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.transports.Transport;
+import org.apache.qpid.jms.transports.TransportListener;
+import org.apache.qpid.jms.util.IOExceptionSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.vertx.java.core.AsyncResult;
+import org.vertx.java.core.AsyncResultHandler;
+import org.vertx.java.core.Handler;
+import org.vertx.java.core.Vertx;
+import org.vertx.java.core.buffer.Buffer;
+import org.vertx.java.core.impl.DefaultVertxFactory;
+import org.vertx.java.core.net.NetClient;
+import org.vertx.java.core.net.NetSocket;
+
+/**
+ * Vertex based TCP transport for raw data packets.
+ */
+public class TcpTransport implements Transport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TcpTransport.class);
+
+    private final Vertx vertx;
+    private final NetClient client;
+    private final URI remoteLocation;
+    private final AtomicBoolean connected = new AtomicBoolean();
+    private final AtomicBoolean closed = new AtomicBoolean();
+    private final AtomicReference<Throwable> connectionError = new AtomicReference<Throwable>();
+
+    private NetSocket socket;
+    private TransportListener listener;
+    private TransportOptions options;
+
+    /**
+     * Create a new transport instance
+     *
+     * @param remoteLocation
+     *        the URI that defines the remote resource to connect to.
+     * @param options
+     *        the transport options used to configure the socket connection.
+     */
+    public TcpTransport(URI remoteLocation, TransportOptions options) {
+        this(null, remoteLocation, options);
+    }
+
+    /**
+     * Create a new transport instance
+     *
+     * @param listener
+     *        the TransportListener that will receive events from this Transport.
+     * @param remoteLocation
+     *        the URI that defines the remote resource to connect to.
+     * @param options
+     *        the transport options used to configure the socket connection.
+     */
+    public TcpTransport(TransportListener listener, URI remoteLocation, TransportOptions options) {
+        this.options = options;
+        this.listener = listener;
+        this.remoteLocation = remoteLocation;
+
+        DefaultVertxFactory vertxFactory = new DefaultVertxFactory();
+        this.vertx = vertxFactory.createVertx();
+        this.client = vertx.createNetClient();
+    }
+
+    @Override
+    public void connect() throws IOException {
+        final CountDownLatch connectLatch = new CountDownLatch(1);
+
+        if (listener == null) {
+            throw new IllegalStateException("A transport listener must be set before connection attempts.");
+        }
+
+        configureNetClient(client, getTransportOptions());
+
+        try {
+            client.connect(remoteLocation.getPort(), remoteLocation.getHost(), new AsyncResultHandler<NetSocket>() {
+                @Override
+                public void handle(AsyncResult<NetSocket> asyncResult) {
+                    if (asyncResult.succeeded()) {
+                        socket = asyncResult.result();
+                        LOG.info("We have connected! Socket is {}", socket);
+
+                        connected.set(true);
+                        connectLatch.countDown();
+
+                        socket.dataHandler(new Handler<Buffer>() {
+                            @Override
+                            public void handle(Buffer event) {
+                                listener.onData(event.getByteBuf());
+                            }
+                        });
+
+                        socket.closeHandler(new Handler<Void>() {
+                            @Override
+                            public void handle(Void event) {
+                                if (!closed.get()) {
+                                    connected.set(false);
+                                    listener.onTransportClosed();
+                                }
+                            }
+                        });
+
+                        socket.exceptionHandler(new Handler<Throwable>() {
+                            @Override
+                            public void handle(Throwable event) {
+                                if (!closed.get()) {
+                                    connected.set(false);
+                                    listener.onTransportError(event);
+                                }
+                            }
+                        });
+
+                    } else {
+                        connected.set(false);
+                        connectionError.set(asyncResult.cause());
+                        connectLatch.countDown();
+                    }
+                }
+            });
+        } catch (Throwable reason) {
+            LOG.info("Failed to connect to target Broker: {}", reason);
+            throw IOExceptionSupport.create(reason);
+        }
+
+        try {
+            connectLatch.await();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+
+        if (connectionError.get() != null) {
+            throw IOExceptionSupport.create(connectionError.get());
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (closed.compareAndSet(false, true)) {
+            if (connected.get()) {
+                socket.close();
+                connected.set(false);
+            }
+
+            vertx.stop();
+        }
+    }
+
+    @Override
+    public void send(ByteBuffer output) throws IOException {
+        checkConnected();
+        int length = output.remaining();
+        if (length == 0) {
+            return;
+        }
+
+        byte[] copy = new byte[length];
+        output.get(copy);
+        Buffer sendBuffer = new Buffer(copy);
+
+        vertx.eventBus().send(socket.writeHandlerID(), sendBuffer);
+    }
+
+    @Override
+    public void send(ByteBuf output) throws IOException {
+        checkConnected();
+        int length = output.readableBytes();
+        if (length == 0) {
+            return;
+        }
+
+        Buffer sendBuffer = new Buffer(output.copy());
+        vertx.eventBus().send(socket.writeHandlerID(), sendBuffer);
+    }
+
+    /**
+     * Allows a subclass to configure the NetClient beyond what this transport might do.
+     *
+     * @throws IOException if an error occurs.
+     */
+    protected void configureNetClient(NetClient client, TransportOptions options) throws IOException {
+        client.setSendBufferSize(options.getSendBufferSize());
+        client.setReceiveBufferSize(options.getReceiveBufferSize());
+        client.setSoLinger(options.getSoLinger());
+        client.setTCPKeepAlive(options.isTcpKeepAlive());
+        client.setTCPNoDelay(options.isTcpNoDelay());
+        if (options.getConnectTimeout() >= 0) {
+            client.setConnectTimeout(options.getConnectTimeout());
+        }
+    }
+
+    @Override
+    public boolean isConnected() {
+        return this.connected.get();
+    }
+
+    private void checkConnected() throws IOException {
+        if (!connected.get()) {
+            throw new IOException("Cannot send to a non-connected transport.");
+        }
+    }
+
+    @Override
+    public TransportListener getTransportListener() {
+        return this.listener;
+    }
+
+    @Override
+    public void setTransportListener(TransportListener listener) {
+        if (listener == null) {
+            throw new IllegalArgumentException("Listener cannot be set to null");
+        }
+
+        this.listener = listener;
+    }
+
+    /**
+     * @return the options used to configure the TCP socket.
+     */
+    public TransportOptions getTransportOptions() {
+        if (options == null) {
+            options = TransportOptions.DEFAULT_OPTIONS;
+        }
+
+        return options;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransportFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransportFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransportFactory.java
new file mode 100644
index 0000000..8385dff
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransportFactory.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.vertx;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.transports.Transport;
+import org.apache.qpid.jms.transports.TransportFactory;
+import org.apache.qpid.jms.util.PropertyUtil;
+
+/**
+ * Factory for creating the Vert.x based TCP Transport
+ */
+public class TcpTransportFactory extends TransportFactory {
+
+    @Override
+    public Transport createTransport(URI remoteURI) throws Exception {
+
+        Map<String, String> map = PropertyUtil.parseQuery(remoteURI.getQuery());
+        Map<String, String> transportURIOptions = PropertyUtil.filterProperties(map, "transport.");
+
+        remoteURI = PropertyUtil.replaceQuery(remoteURI, map);
+
+        TransportOptions transportOptions = new TransportOptions();
+
+        if (!PropertyUtil.setProperties(transportOptions, transportURIOptions)) {
+            String msg = " Not all transport options could be set on the Transport." +
+                         " Check the options are spelled correctly." +
+                         " Given parameters=[" + transportURIOptions + "]." +
+                         " This provider instance cannot be started.";
+            throw new IllegalArgumentException(msg);
+        }
+
+        Transport result = doCreateTransport(remoteURI, transportOptions);
+
+        return result;
+    }
+
+    protected TcpTransport doCreateTransport(URI remoteURI, TransportOptions transportOptions) throws Exception {
+        return new TcpTransport(remoteURI, transportOptions);
+    }
+
+    @Override
+    public String getName() {
+        return "TCP";
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/transports/ssl
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/transports/ssl b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/transports/ssl
new file mode 100644
index 0000000..eed5a6b
--- /dev/null
+++ b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/transports/ssl
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.qpid.jms.transports.vertx.SslTransportFactory

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/transports/tcp
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/transports/tcp b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/transports/tcp
new file mode 100644
index 0000000..533bcd1
--- /dev/null
+++ b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/transports/tcp
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.qpid.jms.transports.vertx.TcpTransportFactory

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java
index f4c5918..84197a4 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java
@@ -29,9 +29,9 @@ import java.util.List;
 
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.Wait;
-import org.apache.qpid.jms.transports.NettyTcpTransport;
-import org.apache.qpid.jms.transports.TcpTransportOptions;
+import org.apache.qpid.jms.transports.TransportOptions;
 import org.apache.qpid.jms.transports.TransportListener;
+import org.apache.qpid.jms.transports.netty.NettyTcpTransport;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,7 +48,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
     private final List<ByteBuf> data = new ArrayList<ByteBuf>();
 
     private final TransportListener testListener = new NettyTransportListener();
-    private final TcpTransportOptions testOptions = new TcpTransportOptions();
+    private final TransportOptions testOptions = new TransportOptions();
 
     @Test(timeout = 60 * 1000)
     public void testConnectToServer() throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-jms git commit: Heavy refactoring of the Transport layer in the client.

Posted by ta...@apache.org.
Heavy refactoring of the Transport layer in the client.  

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/abde5ef2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/abde5ef2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/abde5ef2

Branch: refs/heads/master
Commit: abde5ef20cc173f50d05afc812b93ad0a9cc892d
Parents: 24bd2a9
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Jan 8 16:44:07 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Jan 8 16:44:07 2015 -0500

----------------------------------------------------------------------
 .../qpid/jms/provider/amqp/AmqpProvider.java    |  53 ++-
 .../qpid/jms/provider/amqp/AmqpSslProvider.java |  47 ---
 .../provider/amqp/AmqpSslProviderFactory.java   |   7 +-
 .../qpid/jms/transports/NettyTcpTransport.java  | 197 ----------
 .../qpid/jms/transports/RawTcpTransport.java    | 377 -------------------
 .../qpid/jms/transports/SslTransport.java       |  59 ---
 .../jms/transports/TcpBufferedInputStream.java  | 139 -------
 .../jms/transports/TcpBufferedOutputStream.java | 126 -------
 .../qpid/jms/transports/TcpTransport.java       | 279 --------------
 .../jms/transports/TcpTransportOptions.java     | 153 --------
 .../qpid/jms/transports/TransportFactory.java   | 110 ++++++
 .../qpid/jms/transports/TransportOptions.java   | 155 ++++++++
 .../jms/transports/netty/NettyTcpTransport.java | 224 +++++++++++
 .../netty/NettyTcpTransportFactory.java         |  63 ++++
 .../jms/transports/plain/PlainTcpTransport.java | 362 ++++++++++++++++++
 .../plain/PlainTcpTransportFactory.java         |  63 ++++
 .../plain/TcpBufferedInputStream.java           | 139 +++++++
 .../plain/TcpBufferedOutputStream.java          | 126 +++++++
 .../qpid/jms/transports/vertx/SslTransport.java |  86 +++++
 .../transports/vertx/SslTransportFactory.java   |  42 +++
 .../qpid/jms/transports/vertx/TcpTransport.java | 254 +++++++++++++
 .../transports/vertx/TcpTransportFactory.java   |  63 ++++
 .../services/org/apache/qpid/jms/transports/ssl |  17 +
 .../services/org/apache/qpid/jms/transports/tcp |  17 +
 .../jms/test/netty/NettyTcpTransportTest.java   |   6 +-
 25 files changed, 1753 insertions(+), 1411 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 98c788c..e68bc6e 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -45,10 +45,9 @@ import org.apache.qpid.jms.provider.AbstractProvider;
 import org.apache.qpid.jms.provider.AsyncResult;
 import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
 import org.apache.qpid.jms.provider.ProviderFuture;
-import org.apache.qpid.jms.transports.TcpTransport;
+import org.apache.qpid.jms.transports.TransportFactory;
 import org.apache.qpid.jms.transports.TransportListener;
 import org.apache.qpid.jms.util.IOExceptionSupport;
-import org.apache.qpid.jms.util.PropertyUtil;
 import org.apache.qpid.proton.engine.Collector;
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.Event;
@@ -84,9 +83,11 @@ public class AmqpProvider extends AbstractProvider implements TransportListener
     // NOTE: Limit default channel max to signed short range to deal with
     //       brokers that don't currently handle the unsigned range well.
     private static final int DEFAULT_CHANNEL_MAX = 32767;
+    private static final String DEFAULT_TRANSPORT_KEY = "tcp";
 
     private AmqpConnection connection;
     private org.apache.qpid.jms.transports.Transport transport;
+    private String transportKey = DEFAULT_TRANSPORT_KEY;
     private boolean traceFrames;
     private boolean traceBytes;
     private boolean presettleConsumers;
@@ -125,25 +126,12 @@ public class AmqpProvider extends AbstractProvider implements TransportListener
     public void connect() throws IOException {
         checkClosed();
 
-        transport = createTransport(getRemoteURI());
-
-        Map<String, String> map = null;
         try {
-            map = PropertyUtil.parseQuery(remoteURI.getQuery());
+            transport = TransportFactory.create(getTransportKey(), getRemoteURI());
         } catch (Exception e) {
             throw IOExceptionSupport.create(e);
         }
-        Map<String, String> providerOptions = PropertyUtil.filterProperties(map, "transport.");
-
-        if (!PropertyUtil.setProperties(transport, providerOptions)) {
-            String msg = ""
-                + " Not all transport options could be set on the AMQP Provider transport."
-                + " Check the options are spelled correctly."
-                + " Given parameters=[" + providerOptions + "]."
-                + " This provider instance cannot be started.";
-            throw new IOException(msg);
-        }
-
+        transport.setTransportListener(this);
         transport.connect();
     }
 
@@ -593,19 +581,6 @@ public class AmqpProvider extends AbstractProvider implements TransportListener
         });
     }
 
-    /**
-     * Provides an extension point for subclasses to insert other types of transports such
-     * as SSL etc.
-     *
-     * @param remoteLocation
-     *        The remote location where the transport should attempt to connect.
-     *
-     * @return the newly created transport instance.
-     */
-    protected org.apache.qpid.jms.transports.Transport createTransport(URI remoteLocation) {
-        return new TcpTransport(this, remoteLocation);
-    }
-
     private void updateTracer() {
         if (isTraceFrames()) {
             ((TransportImpl) protonTransport).setProtocolTracer(new ProtocolTracer() {
@@ -881,4 +856,22 @@ public class AmqpProvider extends AbstractProvider implements TransportListener
     public void setChannelMax(int channelMax) {
         this.channelMax = channelMax;
     }
+
+    /**
+     * @return the transportKey that will be used to create the network level connection.
+     */
+    public String getTransportKey() {
+        return transportKey;
+    }
+
+    /**
+     * Sets the transport key used to lookup a Transport instance when an attempt
+     * is made to connect to a remote peer.
+     *
+     * @param transportKey
+     *        the tansportKey to used when looking up a Transport to use.
+     */
+    void setTransportKey(String transportKey) {
+        this.transportKey = transportKey;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProvider.java
deleted file mode 100644
index af7fe7f..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProvider.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.provider.amqp;
-
-import java.net.URI;
-import java.util.Map;
-
-import org.apache.qpid.jms.JmsSslContext;
-import org.apache.qpid.jms.transports.SslTransport;
-import org.apache.qpid.jms.transports.Transport;
-
-/**
- * AmqpProvider extension that enables SSL based transports.
- */
-public class AmqpSslProvider extends AmqpProvider {
-
-    private final JmsSslContext sslContext;
-
-    public AmqpSslProvider(URI remoteURI) {
-        super(remoteURI);
-        this.sslContext = JmsSslContext.getCurrentSslContext();
-    }
-
-    public AmqpSslProvider(URI remoteURI, Map<String, String> extraOptions) {
-        super(remoteURI, extraOptions);
-        this.sslContext = JmsSslContext.getCurrentSslContext();
-    }
-
-    @Override
-    protected Transport createTransport(URI remoteLocation) {
-        return new SslTransport(this, remoteLocation, sslContext);
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProviderFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProviderFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProviderFactory.java
index 01a4f85..a19af00 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProviderFactory.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProviderFactory.java
@@ -27,6 +27,11 @@ public class AmqpSslProviderFactory extends AmqpProviderFactory {
 
     @Override
     public Provider createProvider(URI remoteURI) throws Exception {
-        return new AmqpSslProvider(remoteURI);
+        AmqpProvider provider = new AmqpProvider(remoteURI);
+        // TODO - Would be better if we could do away with this and define
+        //        the transport key in the properties file used to find the
+        //        AmqpProcvider instance.
+        provider.setTransportKey("ssl");
+        return provider;
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/NettyTcpTransport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/NettyTcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/NettyTcpTransport.java
deleted file mode 100644
index 49637c7..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/NettyTcpTransport.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.transports;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.FixedRecvByteBufAllocator;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.nio.NioSocketChannel;
-
-import java.io.IOException;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.qpid.jms.util.IOExceptionSupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.vertx.java.core.net.impl.PartialPooledByteBufAllocator;
-
-/**
- * TCP based transport that uses Netty as the underlying IO layer.
- */
-public class NettyTcpTransport implements Transport {
-
-    private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class);
-
-    private Bootstrap bootstrap;
-    private EventLoopGroup group;
-    private Channel channel;
-    private TransportListener listener;
-    private final TcpTransportOptions options;
-    private final URI remote;
-
-    private final AtomicBoolean connected = new AtomicBoolean();
-    private final AtomicBoolean closed = new AtomicBoolean();
-
-    /**
-     * Create a new transport instance
-     *
-     * @param options
-     *        the transport options used to configure the socket connection.
-     */
-    public NettyTcpTransport(TransportListener listener, URI remoteLocation, TcpTransportOptions options) {
-        this.options = options;
-        this.listener = listener;
-        this.remote = remoteLocation;
-    }
-
-    @Override
-    public void connect() throws IOException {
-
-        if (listener == null) {
-            throw new IllegalStateException("A transport listener must be set before connection attempts.");
-        }
-
-        group = new NioEventLoopGroup();
-
-        bootstrap = new Bootstrap();
-        bootstrap.group(group);
-        bootstrap.channel(NioSocketChannel.class);
-        bootstrap.handler(new ChannelInitializer<Channel>() {
-
-            @Override
-            public void initChannel(Channel connectedChannel) throws Exception {
-                channel = connectedChannel;
-                channel.pipeline().addLast(new NettyTcpTransportHandler());
-            }
-        });
-
-        configureNetty(bootstrap, options);
-
-        ChannelFuture future = bootstrap.connect(remote.getHost(), remote.getPort());
-        future.awaitUninterruptibly();
-
-        if (future.isCancelled()) {
-            throw new IOException("Connection attempt was cancelled");
-        } else if (!future.isSuccess()) {
-            throw IOExceptionSupport.create(future.cause());
-        } else {
-            connected.set(true);
-        }
-    }
-
-    @Override
-    public boolean isConnected() {
-        return connected.get();
-    }
-
-    @Override
-    public void close() throws IOException {
-        if (closed.compareAndSet(false, true)) {
-            channel.close();
-            group.shutdownGracefully();
-        }
-    }
-
-    @Override
-    public void send(ByteBuffer output) throws IOException {
-        send(Unpooled.wrappedBuffer(output));
-    }
-
-    @Override
-    public void send(ByteBuf output) throws IOException {
-        channel.write(output);
-        channel.flush();
-    }
-
-    @Override
-    public TransportListener getTransportListener() {
-        return listener;
-    }
-
-    @Override
-    public void setTransportListener(TransportListener listener) {
-        this.listener = listener;
-    }
-
-    //----- Internal implementation details ----------------------------------//
-
-    protected void configureNetty(Bootstrap bootstrap, TcpTransportOptions options) {
-        bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
-        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
-        bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
-        bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger());
-        bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
-
-        if (options.getSendBufferSize() != -1) {
-            bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize());
-        }
-
-        if (options.getReceiveBufferSize() != -1) {
-            bootstrap.option(ChannelOption.SO_RCVBUF, options.getSendBufferSize());
-            bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getSendBufferSize()));
-        }
-
-        if (options.getTrafficClass() != -1) {
-            bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass());
-        }
-    }
-
-    //----- Handle connection events -----------------------------------------//
-
-    private class NettyTcpTransportHandler extends SimpleChannelInboundHandler<ByteBuf> {
-
-        @Override
-        public void channelActive(ChannelHandlerContext context) throws Exception {
-            LOG.info("Channel has become active! Channel is {}", context.channel());
-        }
-
-        @Override
-        public void channelInactive(ChannelHandlerContext context) throws Exception {
-            LOG.info("Channel has gone inactive! Channel is {}", context.channel());
-            if (!closed.get()) {
-                connected.set(false);
-                listener.onTransportClosed();
-            }
-        }
-
-        @Override
-        public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
-            LOG.info("Exception on channel! Channel is {}", context.channel());
-            if (!closed.get()) {
-                connected.set(false);
-                listener.onTransportError(cause);
-            }
-        }
-
-        @Override
-        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
-            LOG.info("New data read: {} bytes incoming", buffer.readableBytes());
-            listener.onData(buffer);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/RawTcpTransport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/RawTcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/RawTcpTransport.java
deleted file mode 100644
index 210da61..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/RawTcpTransport.java
+++ /dev/null
@@ -1,377 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.transports;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketException;
-import java.net.URI;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.net.SocketFactory;
-
-import org.apache.qpid.jms.util.IOExceptionSupport;
-import org.apache.qpid.jms.util.InetAddressUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- */
-public class RawTcpTransport implements Transport, Runnable {
-
-    private static final Logger LOG = LoggerFactory.getLogger(RawTcpTransport.class);
-
-    private TransportListener listener;
-    private final URI remoteLocation;
-    private final AtomicBoolean connected = new AtomicBoolean();
-    private final AtomicBoolean closed = new AtomicBoolean();
-    private final AtomicReference<Throwable> connectionError = new AtomicReference<Throwable>();
-
-    private final Socket socket;
-    private DataOutputStream dataOut;
-    private DataInputStream dataIn;
-    private Thread runner;
-
-    private boolean closeAsync = true;
-    private int socketBufferSize = 64 * 1024;
-    private int soTimeout = 0;
-    private int soLinger = Integer.MIN_VALUE;
-    private Boolean keepAlive;
-    private Boolean tcpNoDelay = true;
-    private boolean useLocalHost = false;
-    private int ioBufferSize = 8 * 1024;
-
-    /**
-     * Create a new instance of the transport.
-     *
-     * @param listener
-     *        The TransportListener that will receive data from this Transport instance.
-     * @param remoteLocation
-     *        The remote location where this transport should connection to.
-     */
-    public RawTcpTransport(TransportListener listener, URI remoteLocation) {
-        this.listener = listener;
-        this.remoteLocation = remoteLocation;
-
-        Socket temp = null;
-        try {
-            temp = createSocketFactory().createSocket();
-        } catch (IOException e) {
-            connectionError.set(e);
-        }
-
-        this.socket = temp;
-    }
-
-    @Override
-    public void connect() throws IOException {
-        if (connectionError.get() != null) {
-            throw IOExceptionSupport.create(connectionError.get());
-        }
-
-        if (socket == null) {
-            throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
-        }
-
-        InetSocketAddress remoteAddress = null;
-
-        if (remoteLocation != null) {
-            String host = resolveHostName(remoteLocation.getHost());
-            remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
-        }
-
-        socket.connect(remoteAddress);
-
-        connected.set(true);
-
-        initialiseSocket(socket);
-        initializeStreams();
-
-        runner = new Thread(null, this, "QpidJMS RawTcpTransport: " + toString());
-        runner.setDaemon(false);
-        runner.start();
-    }
-
-    @Override
-    public void close() throws IOException {
-        if (closed.compareAndSet(false, true)) {
-            if (socket == null) {
-                return;
-            }
-
-            // Closing the streams flush the sockets before closing.. if the socket
-            // is hung.. then this hangs the close so we support an asynchronous close
-            // by default which will timeout if the close doesn't happen after a delay.
-            if (closeAsync) {
-                final CountDownLatch latch = new CountDownLatch(1);
-
-                final ExecutorService closer = Executors.newSingleThreadExecutor();
-                closer.execute(new Runnable() {
-                    @Override
-                    public void run() {
-                        LOG.trace("Closing socket {}", socket);
-                        try {
-                            socket.close();
-                            LOG.debug("Closed socket {}", socket);
-                        } catch (IOException e) {
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("Caught exception closing socket " + socket + ". This exception will be ignored.", e);
-                            }
-                        } finally {
-                            latch.countDown();
-                        }
-                    }
-                });
-
-                try {
-                    latch.await(1,TimeUnit.SECONDS);
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                } finally {
-                    closer.shutdownNow();
-                }
-            } else {
-                LOG.trace("Closing socket {}", socket);
-                try {
-                    socket.close();
-                    LOG.debug("Closed socket {}", socket);
-                } catch (IOException e) {
-                    LOG.debug("Caught exception closing socket {}. This exception will be ignored.", socket, e);
-                }
-            }
-        }
-    }
-
-    @Override
-    public void send(ByteBuffer output) throws IOException {
-        checkConnected();
-        LOG.info("RawTcpTransport sending packet of size: {}", output.remaining());
-        WritableByteChannel channel = Channels.newChannel(dataOut);
-        channel.write(output);
-        dataOut.flush();
-    }
-
-    @Override
-    public void send(ByteBuf output) throws IOException {
-        checkConnected();
-        send(output.nioBuffer());
-    }
-
-    @Override
-    public boolean isConnected() {
-        return this.connected.get();
-    }
-
-    @Override
-    public TransportListener getTransportListener() {
-        return this.listener;
-    }
-
-    @Override
-    public void setTransportListener(TransportListener listener) {
-        if (listener == null) {
-            throw new IllegalArgumentException("Listener cannot be set to null");
-        }
-
-        this.listener = listener;
-    }
-
-    public int getSocketBufferSize() {
-        return socketBufferSize;
-    }
-
-    public void setSocketBufferSize(int socketBufferSize) {
-        this.socketBufferSize = socketBufferSize;
-    }
-
-    public int getSoTimeout() {
-        return soTimeout;
-    }
-
-    public void setSoTimeout(int soTimeout) {
-        this.soTimeout = soTimeout;
-    }
-
-    public boolean isTcpNoDelay() {
-        return tcpNoDelay;
-    }
-
-    public void setTcpNoDelay(Boolean tcpNoDelay) {
-        this.tcpNoDelay = tcpNoDelay;
-    }
-
-    public int getSoLinger() {
-        return soLinger;
-    }
-
-    public void setSoLinger(int soLinger) {
-        this.soLinger = soLinger;
-    }
-
-    public boolean isKeepAlive() {
-        return keepAlive;
-    }
-
-    public void setKeepAlive(Boolean keepAlive) {
-        this.keepAlive = keepAlive;
-    }
-
-    public boolean isUseLocalHost() {
-        return useLocalHost;
-    }
-
-    public void setUseLocalHost(boolean useLocalHost) {
-        this.useLocalHost = useLocalHost;
-    }
-
-    public int getIoBufferSize() {
-        return ioBufferSize;
-    }
-
-    public void setIoBufferSize(int ioBufferSize) {
-        this.ioBufferSize = ioBufferSize;
-    }
-
-    public boolean isCloseAsync() {
-        return closeAsync;
-    }
-
-    public void setCloseAsync(boolean closeAsync) {
-        this.closeAsync = closeAsync;
-    }
-
-    //---------- Transport internal implementation ---------------------------//
-
-    @Override
-    public void run() {
-        LOG.trace("TCP consumer thread for " + this + " starting");
-        try {
-            while (isConnected()) {
-                doRun();
-            }
-        } catch (IOException e) {
-            connectionError.set(e);
-            onException(e);
-        } catch (Throwable e) {
-            IOException ioe = new IOException("Unexpected error occured: " + e);
-            connectionError.set(ioe);
-            ioe.initCause(e);
-            onException(ioe);
-        }
-    }
-
-    protected void doRun() throws IOException {
-        int size = dataIn.available();
-        if (size <= 0) {
-            try {
-                TimeUnit.NANOSECONDS.sleep(1);
-            } catch (InterruptedException e) {
-            }
-            return;
-        }
-
-        byte[] buffer = new byte[size];
-        dataIn.readFully(buffer);
-        listener.onData(Unpooled.wrappedBuffer(buffer));
-    }
-
-    /**
-     * Passes any IO exceptions into the transport listener
-     */
-    public void onException(IOException e) {
-        if (listener != null) {
-            try {
-                listener.onTransportError(e);
-            } catch (RuntimeException e2) {
-                LOG.debug("Unexpected runtime exception: " + e2, e2);
-            }
-        }
-    }
-
-    protected SocketFactory createSocketFactory() throws IOException {
-        return SocketFactory.getDefault();
-    }
-
-    protected void initialiseSocket(Socket sock) throws SocketException, IllegalArgumentException {
-        try {
-            sock.setReceiveBufferSize(socketBufferSize);
-            sock.setSendBufferSize(socketBufferSize);
-        } catch (SocketException se) {
-            LOG.warn("Cannot set socket buffer size = {}", socketBufferSize);
-            LOG.debug("Cannot set socket buffer size. Reason: {}. This exception is ignored.", se.getMessage(), se);
-        }
-
-        sock.setSoTimeout(soTimeout);
-
-        if (keepAlive != null) {
-            sock.setKeepAlive(keepAlive.booleanValue());
-        }
-
-        if (soLinger > -1) {
-            sock.setSoLinger(true, soLinger);
-        } else if (soLinger == -1) {
-            sock.setSoLinger(false, 0);
-        }
-
-        if (tcpNoDelay != null) {
-            sock.setTcpNoDelay(tcpNoDelay.booleanValue());
-        }
-    }
-
-    protected void initializeStreams() throws IOException {
-        try {
-            TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize);
-            this.dataIn = new DataInputStream(buffIn);
-            TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
-            this.dataOut = new DataOutputStream(outputStream);
-        } catch (Throwable e) {
-            throw IOExceptionSupport.create(e);
-        }
-    }
-
-    protected String resolveHostName(String host) throws UnknownHostException {
-        if (isUseLocalHost()) {
-            String localName = InetAddressUtil.getLocalHostName();
-            if (localName != null && localName.equals(host)) {
-                return "localhost";
-            }
-        }
-        return host;
-    }
-
-    private void checkConnected() throws IOException {
-        if (!connected.get()) {
-            throw new IOException("Cannot send to a non-connected transport.");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/SslTransport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/SslTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/SslTransport.java
deleted file mode 100644
index 0860aae..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/SslTransport.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.transports;
-
-import java.io.IOException;
-import java.net.URI;
-
-import org.apache.qpid.jms.JmsSslContext;
-import org.vertx.java.core.net.NetClient;
-
-/**
- * Provides SSL configuration to the Vert.x NetClient object used by the underling
- * TCP based Transport.
- */
-public class SslTransport extends TcpTransport {
-
-    private final JmsSslContext context;
-
-    /**
-     * Create an instance of the SSL transport
-     *
-     * @param listener
-     *        The TransportListener that will handle events from this Transport instance.
-     * @param remoteLocation
-     *        The location that is being connected to.
-     * @param context
-     *        The JMS Framework SslContext to use for this SSL connection.
-     */
-    public SslTransport(TransportListener listener, URI remoteLocation, JmsSslContext context) {
-        super(listener, remoteLocation);
-
-        this.context = context;
-    }
-
-    @Override
-    protected void configureNetClient(NetClient client) throws IOException {
-        super.configureNetClient(client);
-
-        client.setSSL(true);
-        client.setKeyStorePath(context.getKeyStoreLocation());
-        client.setKeyStorePassword(context.getKeyStorePassword());
-        client.setTrustStorePath(context.getTrustStoreLocation());
-        client.setTrustStorePassword(context.getTrustStorePassword());
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedInputStream.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedInputStream.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedInputStream.java
deleted file mode 100644
index c7ba887..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedInputStream.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.transports;
-
-import java.io.FilterInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * An optimized buffered input stream for Tcp
- */
-public class TcpBufferedInputStream extends FilterInputStream {
-
-    private static final int DEFAULT_BUFFER_SIZE = 8192;
-    protected byte internalBuffer[];
-    protected int count;
-    protected int position;
-
-    public TcpBufferedInputStream(InputStream in) {
-        this(in, DEFAULT_BUFFER_SIZE);
-    }
-
-    public TcpBufferedInputStream(InputStream in, int size) {
-        super(in);
-        if (size <= 0) {
-            throw new IllegalArgumentException("Buffer size <= 0");
-        }
-        internalBuffer = new byte[size];
-    }
-
-    protected void fill() throws IOException {
-        byte[] buffer = internalBuffer;
-        count = 0;
-        position = 0;
-        int n = in.read(buffer, position, buffer.length - position);
-        if (n > 0) {
-            count = n + position;
-        }
-    }
-
-    @Override
-    public int read() throws IOException {
-        if (position >= count) {
-            fill();
-            if (position >= count) {
-                return -1;
-            }
-        }
-        return internalBuffer[position++] & 0xff;
-    }
-
-    private int readStream(byte[] b, int off, int len) throws IOException {
-        int avail = count - position;
-        if (avail <= 0) {
-            if (len >= internalBuffer.length) {
-                return in.read(b, off, len);
-            }
-            fill();
-            avail = count - position;
-            if (avail <= 0) {
-                return -1;
-            }
-        }
-        int cnt = (avail < len) ? avail : len;
-        System.arraycopy(internalBuffer, position, b, off, cnt);
-        position += cnt;
-        return cnt;
-    }
-
-    @Override
-    public int read(byte b[], int off, int len) throws IOException {
-        if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
-            throw new IndexOutOfBoundsException();
-        } else if (len == 0) {
-            return 0;
-        }
-        int n = 0;
-        for (;;) {
-            int nread = readStream(b, off + n, len - n);
-            if (nread <= 0) {
-                return (n == 0) ? nread : n;
-            }
-            n += nread;
-            if (n >= len) {
-                return n;
-            }
-            // if not closed but no bytes available, return
-            InputStream input = in;
-            if (input != null && input.available() <= 0) {
-                return n;
-            }
-        }
-    }
-
-    @Override
-    public long skip(long n) throws IOException {
-        if (n <= 0) {
-            return 0;
-        }
-        long avail = count - position;
-        if (avail <= 0) {
-            return in.skip(n);
-        }
-        long skipped = (avail < n) ? avail : n;
-        position += skipped;
-        return skipped;
-    }
-
-    @Override
-    public int available() throws IOException {
-        return in.available() + (count - position);
-    }
-
-    @Override
-    public boolean markSupported() {
-        return false;
-    }
-
-    @Override
-    public void close() throws IOException {
-        if (in != null) {
-            in.close();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedOutputStream.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedOutputStream.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedOutputStream.java
deleted file mode 100644
index 82f8c41..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedOutputStream.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.transports;
-
-import java.io.FilterOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * An optimized buffered outputstream for Tcp
- */
-public class TcpBufferedOutputStream extends FilterOutputStream {
-
-    private static final int BUFFER_SIZE = 8192;
-    private final byte[] buffer;
-    private final int bufferlen;
-    private int count;
-
-    /**
-     * Constructor
-     *
-     * @param out
-     */
-    public TcpBufferedOutputStream(OutputStream out) {
-        this(out, BUFFER_SIZE);
-    }
-
-    /**
-     * Creates a new buffered output stream to write data to the specified underlying output
-     * stream with the specified buffer size.
-     *
-     * @param out
-     *        the underlying output stream.
-     * @param size
-     *        the buffer size.
-     * @throws IllegalArgumentException
-     *         if size <= 0.
-     */
-    public TcpBufferedOutputStream(OutputStream out, int size) {
-        super(out);
-        if (size <= 0) {
-            throw new IllegalArgumentException("Buffer size <= 0");
-        }
-        buffer = new byte[size];
-        bufferlen = size;
-    }
-
-    /**
-     * write a byte on to the stream
-     *
-     * @param b
-     *        - byte to write
-     * @throws IOException
-     */
-    @Override
-    public void write(int b) throws IOException {
-        if ((bufferlen - count) < 1) {
-            flush();
-        }
-        buffer[count++] = (byte) b;
-    }
-
-    /**
-     * write a byte array to the stream
-     *
-     * @param b
-     *        the byte buffer
-     * @param off
-     *        the offset into the buffer
-     * @param len
-     *        the length of data to write
-     * @throws IOException
-     */
-    @Override
-    public void write(byte b[], int off, int len) throws IOException {
-        if (b != null) {
-            if ((bufferlen - count) < len) {
-                flush();
-            }
-            if (buffer.length >= len) {
-                System.arraycopy(b, off, buffer, count, len);
-                count += len;
-            } else {
-                out.write(b, off, len);
-            }
-        }
-    }
-
-    /**
-     * flush the data to the output stream This doesn't call flush on the underlying
-     * outputstream, because Tcp is particularly efficent at doing this itself ....
-     *
-     * @throws IOException
-     */
-    @Override
-    public void flush() throws IOException {
-        if (count > 0 && out != null) {
-            out.write(buffer, 0, count);
-            count = 0;
-        }
-    }
-
-    /**
-     * close this stream
-     *
-     * @throws IOException
-     */
-    @Override
-    public void close() throws IOException {
-        super.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransport.java
deleted file mode 100644
index bc2cada..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransport.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.transports;
-
-import io.netty.buffer.ByteBuf;
-
-import java.io.IOException;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.qpid.jms.util.IOExceptionSupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.vertx.java.core.AsyncResult;
-import org.vertx.java.core.AsyncResultHandler;
-import org.vertx.java.core.Handler;
-import org.vertx.java.core.Vertx;
-import org.vertx.java.core.buffer.Buffer;
-import org.vertx.java.core.impl.DefaultVertxFactory;
-import org.vertx.java.core.net.NetClient;
-import org.vertx.java.core.net.NetSocket;
-
-/**
- * Vertex based TCP transport for raw data packets.
- */
-public class TcpTransport implements Transport {
-
-    private static final Logger LOG = LoggerFactory.getLogger(TcpTransport.class);
-
-    private final Vertx vertx;
-    private final NetClient client;
-    private final URI remoteLocation;
-    private final AtomicBoolean connected = new AtomicBoolean();
-    private final AtomicBoolean closed = new AtomicBoolean();
-    private final AtomicReference<Throwable> connectionError = new AtomicReference<Throwable>();
-
-    private NetSocket socket;
-
-    private TransportListener listener;
-    private int socketBufferSize = 64 * 1024;
-    private int soTimeout = -1;
-    private int connectTimeout = -1;
-    private int soLinger = Integer.MIN_VALUE;
-    private boolean keepAlive;
-    private boolean tcpNoDelay = true;
-
-    /**
-     * Create a new instance of the transport.
-     *
-     * @param listener
-     *        The TransportListener that will receive data from this Transport instance.
-     * @param remoteLocation
-     *        The remote location where this transport should connection to.
-     */
-    public TcpTransport(TransportListener listener, URI remoteLocation) {
-        this.listener = listener;
-        this.remoteLocation = remoteLocation;
-
-        DefaultVertxFactory vertxFactory = new DefaultVertxFactory();
-        this.vertx = vertxFactory.createVertx();
-        this.client = vertx.createNetClient();
-    }
-
-    @Override
-    public void connect() throws IOException {
-        final CountDownLatch connectLatch = new CountDownLatch(1);
-
-        if (listener == null) {
-            throw new IllegalStateException("A transport listener must be set before connection attempts.");
-        }
-
-        configureNetClient(client);
-
-        try {
-            client.connect(remoteLocation.getPort(), remoteLocation.getHost(), new AsyncResultHandler<NetSocket>() {
-                @Override
-                public void handle(AsyncResult<NetSocket> asyncResult) {
-                    if (asyncResult.succeeded()) {
-                        socket = asyncResult.result();
-                        LOG.info("We have connected! Socket is {}", socket);
-
-                        connected.set(true);
-                        connectLatch.countDown();
-
-                        socket.dataHandler(new Handler<Buffer>() {
-                            @Override
-                            public void handle(Buffer event) {
-                                listener.onData(event.getByteBuf());
-                            }
-                        });
-
-                        socket.closeHandler(new Handler<Void>() {
-                            @Override
-                            public void handle(Void event) {
-                                if (!closed.get()) {
-                                    connected.set(false);
-                                    listener.onTransportClosed();
-                                }
-                            }
-                        });
-
-                        socket.exceptionHandler(new Handler<Throwable>() {
-                            @Override
-                            public void handle(Throwable event) {
-                                if (!closed.get()) {
-                                    connected.set(false);
-                                    listener.onTransportError(event);
-                                }
-                            }
-                        });
-
-                    } else {
-                        connected.set(false);
-                        connectionError.set(asyncResult.cause());
-                        connectLatch.countDown();
-                    }
-                }
-            });
-        } catch (Throwable reason) {
-            LOG.info("Failed to connect to target Broker: {}", reason);
-            throw IOExceptionSupport.create(reason);
-        }
-
-        try {
-            connectLatch.await();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        }
-
-        if (connectionError.get() != null) {
-            throw IOExceptionSupport.create(connectionError.get());
-        }
-    }
-
-    @Override
-    public void close() throws IOException {
-        if (closed.compareAndSet(false, true)) {
-            if (connected.get()) {
-                socket.close();
-                connected.set(false);
-            }
-
-            vertx.stop();
-        }
-    }
-
-    @Override
-    public void send(ByteBuffer output) throws IOException {
-        checkConnected();
-        int length = output.remaining();
-        if (length == 0) {
-            return;
-        }
-
-        byte[] copy = new byte[length];
-        output.get(copy);
-        Buffer sendBuffer = new Buffer(copy);
-
-        vertx.eventBus().send(socket.writeHandlerID(), sendBuffer);
-    }
-
-    @Override
-    public void send(ByteBuf output) throws IOException {
-        checkConnected();
-        int length = output.readableBytes();
-        if (length == 0) {
-            return;
-        }
-
-        Buffer sendBuffer = new Buffer(output.copy());
-        vertx.eventBus().send(socket.writeHandlerID(), sendBuffer);
-    }
-
-    /**
-     * Allows a subclass to configure the NetClient beyond what this transport might do.
-     *
-     * @throws IOException if an error occurs.
-     */
-    protected void configureNetClient(NetClient client) throws IOException {
-        client.setSendBufferSize(getSocketBufferSize());
-        client.setReceiveBufferSize(getSocketBufferSize());
-        client.setSoLinger(soLinger);
-        client.setTCPKeepAlive(keepAlive);
-        client.setTCPNoDelay(tcpNoDelay);
-        if (connectTimeout >= 0) {
-            client.setConnectTimeout(connectTimeout);
-        }
-    }
-
-    @Override
-    public boolean isConnected() {
-        return this.connected.get();
-    }
-
-    private void checkConnected() throws IOException {
-        if (!connected.get()) {
-            throw new IOException("Cannot send to a non-connected transport.");
-        }
-    }
-
-    @Override
-    public TransportListener getTransportListener() {
-        return this.listener;
-    }
-
-    @Override
-    public void setTransportListener(TransportListener listener) {
-        if (listener == null) {
-            throw new IllegalArgumentException("Listener cannot be set to null");
-        }
-
-        this.listener = listener;
-    }
-
-    public int getSocketBufferSize() {
-        return socketBufferSize;
-    }
-
-    public void setSocketBufferSize(int socketBufferSize) {
-        this.socketBufferSize = socketBufferSize;
-    }
-
-    public int getSoTimeout() {
-        return soTimeout;
-    }
-
-    public void setSoTimeout(int soTimeout) {
-        this.soTimeout = soTimeout;
-    }
-
-    public boolean isTcpNoDelay() {
-        return tcpNoDelay;
-    }
-
-    public void setTcpNoDelay(boolean tcpNoDelay) {
-        this.tcpNoDelay = tcpNoDelay;
-    }
-
-    public int getSoLinger() {
-        return soLinger;
-    }
-
-    public void setSoLinger(int soLinger) {
-        this.soLinger = soLinger;
-    }
-
-    public boolean isKeepAlive() {
-        return keepAlive;
-    }
-
-    public void setKeepAlive(boolean keepAlive) {
-        this.keepAlive = keepAlive;
-    }
-
-    public int getConnectTimeout() {
-        return connectTimeout;
-    }
-
-    public void setConnectTimeout(int connectTimeout) {
-        this.connectTimeout = connectTimeout;
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransportOptions.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransportOptions.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransportOptions.java
deleted file mode 100644
index e5f90c3..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransportOptions.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.transports;
-
-/**
- * Encapsulates all the TCP Transport options in one configuration object.
- */
-public class TcpTransportOptions {
-
-    public static final int DEFAULT_SEND_BUFFER_SIZE = 64 * 1024;
-    public static final int DEFAULT_RECEIVE_BUFFER_SIZE = DEFAULT_SEND_BUFFER_SIZE;
-    public static final int DEFAULT_TRAFFIC_CLASS = 0;
-    public static final boolean DEFAULT_TCP_NO_DELAY = true;
-    public static final boolean DEFAULT_TCP_KEEP_ALIVE = false;
-    public static final int DEFAULT_SO_LINGER = Integer.MIN_VALUE;
-    public static final int DEFAULT_SO_TIMEOUT = -1;
-    public static final int DEFAULT_CONNECT_TIMEOUT = 60000;
-
-    private int sendBufferSize = DEFAULT_SEND_BUFFER_SIZE;
-    private int receiveBufferSize = DEFAULT_RECEIVE_BUFFER_SIZE;
-    private int trafficClass = DEFAULT_TRAFFIC_CLASS;
-    private int connectTimeout = DEFAULT_CONNECT_TIMEOUT;
-    private int soTimeout = DEFAULT_SO_TIMEOUT;
-    private int soLinger = DEFAULT_SO_LINGER;
-    private boolean tcpKeepAlive = DEFAULT_TCP_KEEP_ALIVE;
-    private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY;
-
-    /**
-     * @return the currently set send buffer size in bytes.
-     */
-    public int getSendBufferSize() {
-        return sendBufferSize;
-    }
-
-    /**
-     * Sets the send buffer size in bytes, the value must be greater than zero
-     * or an {@link IllegalArgumentException} will be thrown.
-     *
-     * @param sendBufferSize
-     *        the new send buffer size for the TCP Transport.
-     *
-     * @throws IllegalArgumentException if the value given is not in the valid range.
-     */
-    public void setSendBufferSize(int sendBufferSize) {
-        if (sendBufferSize <= 0) {
-            throw new IllegalArgumentException("The send buffer size must be > 0");
-        }
-
-        this.sendBufferSize = sendBufferSize;
-    }
-
-    /**
-     * @return the currently configured receive buffer size in bytes.
-     */
-    public int getReceiveBufferSize() {
-        return receiveBufferSize;
-    }
-
-    /**
-     * Sets the receive buffer size in bytes, the value must be greater than zero
-     * or an {@link IllegalArgumentException} will be thrown.
-     *
-     * @param receiveBufferSize
-     *        the new receive buffer size for the TCP Transport.
-     *
-     * @throws IllegalArgumentException if the value given is not in the valid range.
-     */
-    public void setReceiveBufferSize(int receiveBufferSize) {
-        if (receiveBufferSize <= 0) {
-            throw new IllegalArgumentException("The send buffer size must be > 0");
-        }
-
-        this.receiveBufferSize = receiveBufferSize;
-    }
-
-    /**
-     * @return the currently configured traffic class value.
-     */
-    public int getTrafficClass() {
-        return trafficClass;
-    }
-
-    /**
-     * Sets the traffic class value used by the TCP connection, valid
-     * range is between 0 and 255.
-     *
-     * @param trafficClass
-     *        the new traffic class value.
-     *
-     * @throws IllegalArgumentException if the value given is not in the valid range.
-     */
-    public void setTrafficClass(int trafficClass) {
-        if (trafficClass < 0 || trafficClass > 255) {
-            throw new IllegalArgumentException("Traffic class must be in the range [0..255]");
-        }
-
-        this.trafficClass = trafficClass;
-    }
-
-    public int getSoTimeout() {
-        return soTimeout;
-    }
-
-    public void setSoTimeout(int soTimeout) {
-        this.soTimeout = soTimeout;
-    }
-
-    public boolean isTcpNoDelay() {
-        return tcpNoDelay;
-    }
-
-    public void setTcpNoDelay(boolean tcpNoDelay) {
-        this.tcpNoDelay = tcpNoDelay;
-    }
-
-    public int getSoLinger() {
-        return soLinger;
-    }
-
-    public void setSoLinger(int soLinger) {
-        this.soLinger = soLinger;
-    }
-
-    public boolean isTcpKeepAlive() {
-        return tcpKeepAlive;
-    }
-
-    public void setTcpKeepAlive(boolean keepAlive) {
-        this.tcpKeepAlive = keepAlive;
-    }
-
-    public int getConnectTimeout() {
-        return connectTimeout;
-    }
-
-    public void setConnectTimeout(int connectTimeout) {
-        this.connectTimeout = connectTimeout;
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportFactory.java
new file mode 100644
index 0000000..463adfb
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportFactory.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.qpid.jms.util.FactoryFinder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Interface that all Transport types must implement.
+ */
+public abstract class TransportFactory {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TransportFactory.class);
+
+    private static final FactoryFinder<TransportFactory> TRANSPORT_FACTORY_FINDER =
+        new FactoryFinder<TransportFactory>(TransportFactory.class,
+            "META-INF/services/" + TransportFactory.class.getPackage().getName().replace(".", "/") + "/");
+
+    /**
+     * Creates an instance of the given Transport and configures it using the
+     * properties set on the given remote broker URI.
+     *
+     * @param remoteURI
+     *        The URI used to connect to a remote Broker.
+     *
+     * @return a new Transport instance.
+     *
+     * @throws Exception if an error occurs while creating the Transport instance.
+     */
+    public abstract Transport createTransport(URI remoteURI) throws Exception;
+
+    /**
+     * @return the name of this Transport.
+     */
+    public abstract String getName();
+
+    /**
+     * Static create method that performs the TransportFactory search and handles the
+     * configuration and setup.
+     *
+     * @param transportKey
+     *        The transport type name used to locate a TransportFactory.
+     * @param remoteURI
+     *        the URI of the remote peer.
+     *
+     * @return a new Transport instance that is ready for use.
+     *
+     * @throws Exception if an error occurs while creating the Transport instance.
+     */
+    public static Transport create(String transportKey, URI remoteURI) throws Exception {
+        Transport result = null;
+
+        try {
+            TransportFactory factory = findTransportFactory(transportKey);
+            result = factory.createTransport(remoteURI);
+        } catch (Exception ex) {
+            LOG.error("Failed to create Transport instance for {}, due to: {}", remoteURI.getScheme(), ex);
+            LOG.trace("Error: ", ex);
+            throw ex;
+        }
+
+        return result;
+    }
+
+    /**
+     * Searches for a TransportFactory by using the scheme from the given key.
+     *
+     * The search first checks the local cache of Transport factories before moving on
+     * to search in the class-path.
+     *
+     * @param transportKey
+     *        The transport type name used to locate a TransportFactory.
+     *
+     * @return a Transport factory instance matching the transport key.
+     *
+     * @throws IOException if an error occurs while locating the factory.
+     */
+    public static TransportFactory findTransportFactory(String transportKey) throws IOException {
+        if (transportKey == null) {
+            throw new IOException("No Transport key specified");
+        }
+
+        TransportFactory factory = null;
+        try {
+            factory = TRANSPORT_FACTORY_FINDER.newInstance(transportKey);
+        } catch (Throwable e) {
+            throw new IOException("Transport type NOT recognized: [" + transportKey + "]", e);
+        }
+
+        return factory;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java
new file mode 100644
index 0000000..f0e34ca
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports;
+
+/**
+ * Encapsulates all the TCP Transport options in one configuration object.
+ */
+public class TransportOptions {
+
+    public static final int DEFAULT_SEND_BUFFER_SIZE = 64 * 1024;
+    public static final int DEFAULT_RECEIVE_BUFFER_SIZE = DEFAULT_SEND_BUFFER_SIZE;
+    public static final int DEFAULT_TRAFFIC_CLASS = 0;
+    public static final boolean DEFAULT_TCP_NO_DELAY = true;
+    public static final boolean DEFAULT_TCP_KEEP_ALIVE = false;
+    public static final int DEFAULT_SO_LINGER = Integer.MIN_VALUE;
+    public static final int DEFAULT_SO_TIMEOUT = -1;
+    public static final int DEFAULT_CONNECT_TIMEOUT = 60000;
+
+    public static final TransportOptions DEFAULT_OPTIONS = new TransportOptions();
+
+    private int sendBufferSize = DEFAULT_SEND_BUFFER_SIZE;
+    private int receiveBufferSize = DEFAULT_RECEIVE_BUFFER_SIZE;
+    private int trafficClass = DEFAULT_TRAFFIC_CLASS;
+    private int connectTimeout = DEFAULT_CONNECT_TIMEOUT;
+    private int soTimeout = DEFAULT_SO_TIMEOUT;
+    private int soLinger = DEFAULT_SO_LINGER;
+    private boolean tcpKeepAlive = DEFAULT_TCP_KEEP_ALIVE;
+    private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY;
+
+    /**
+     * @return the currently set send buffer size in bytes.
+     */
+    public int getSendBufferSize() {
+        return sendBufferSize;
+    }
+
+    /**
+     * Sets the send buffer size in bytes, the value must be greater than zero
+     * or an {@link IllegalArgumentException} will be thrown.
+     *
+     * @param sendBufferSize
+     *        the new send buffer size for the TCP Transport.
+     *
+     * @throws IllegalArgumentException if the value given is not in the valid range.
+     */
+    public void setSendBufferSize(int sendBufferSize) {
+        if (sendBufferSize <= 0) {
+            throw new IllegalArgumentException("The send buffer size must be > 0");
+        }
+
+        this.sendBufferSize = sendBufferSize;
+    }
+
+    /**
+     * @return the currently configured receive buffer size in bytes.
+     */
+    public int getReceiveBufferSize() {
+        return receiveBufferSize;
+    }
+
+    /**
+     * Sets the receive buffer size in bytes, the value must be greater than zero
+     * or an {@link IllegalArgumentException} will be thrown.
+     *
+     * @param receiveBufferSize
+     *        the new receive buffer size for the TCP Transport.
+     *
+     * @throws IllegalArgumentException if the value given is not in the valid range.
+     */
+    public void setReceiveBufferSize(int receiveBufferSize) {
+        if (receiveBufferSize <= 0) {
+            throw new IllegalArgumentException("The send buffer size must be > 0");
+        }
+
+        this.receiveBufferSize = receiveBufferSize;
+    }
+
+    /**
+     * @return the currently configured traffic class value.
+     */
+    public int getTrafficClass() {
+        return trafficClass;
+    }
+
+    /**
+     * Sets the traffic class value used by the TCP connection, valid
+     * range is between 0 and 255.
+     *
+     * @param trafficClass
+     *        the new traffic class value.
+     *
+     * @throws IllegalArgumentException if the value given is not in the valid range.
+     */
+    public void setTrafficClass(int trafficClass) {
+        if (trafficClass < 0 || trafficClass > 255) {
+            throw new IllegalArgumentException("Traffic class must be in the range [0..255]");
+        }
+
+        this.trafficClass = trafficClass;
+    }
+
+    public int getSoTimeout() {
+        return soTimeout;
+    }
+
+    public void setSoTimeout(int soTimeout) {
+        this.soTimeout = soTimeout;
+    }
+
+    public boolean isTcpNoDelay() {
+        return tcpNoDelay;
+    }
+
+    public void setTcpNoDelay(boolean tcpNoDelay) {
+        this.tcpNoDelay = tcpNoDelay;
+    }
+
+    public int getSoLinger() {
+        return soLinger;
+    }
+
+    public void setSoLinger(int soLinger) {
+        this.soLinger = soLinger;
+    }
+
+    public boolean isTcpKeepAlive() {
+        return tcpKeepAlive;
+    }
+
+    public void setTcpKeepAlive(boolean keepAlive) {
+        this.tcpKeepAlive = keepAlive;
+    }
+
+    public int getConnectTimeout() {
+        return connectTimeout;
+    }
+
+    public void setConnectTimeout(int connectTimeout) {
+        this.connectTimeout = connectTimeout;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
new file mode 100644
index 0000000..3d4a928
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.FixedRecvByteBufAllocator;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.transports.Transport;
+import org.apache.qpid.jms.transports.TransportListener;
+import org.apache.qpid.jms.util.IOExceptionSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.vertx.java.core.net.impl.PartialPooledByteBufAllocator;
+
+/**
+ * TCP based transport that uses Netty as the underlying IO layer.
+ */
+public class NettyTcpTransport implements Transport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class);
+
+    private Bootstrap bootstrap;
+    private EventLoopGroup group;
+    private Channel channel;
+    private TransportListener listener;
+    private TransportOptions options;
+    private final URI remote;
+
+    private final AtomicBoolean connected = new AtomicBoolean();
+    private final AtomicBoolean closed = new AtomicBoolean();
+
+    /**
+     * Create a new transport instance
+     *
+     * @param remoteLocation
+     *        the URI that defines the remote resource to connect to.
+     * @param options
+     *        the transport options used to configure the socket connection.
+     */
+    public NettyTcpTransport(URI remoteLocation, TransportOptions options) {
+        this(null, remoteLocation, options);
+    }
+
+    /**
+     * Create a new transport instance
+     *
+     * @param listener
+     *        the TransportListener that will receive events from this Transport.
+     * @param remoteLocation
+     *        the URI that defines the remote resource to connect to.
+     * @param options
+     *        the transport options used to configure the socket connection.
+     */
+    public NettyTcpTransport(TransportListener listener, URI remoteLocation, TransportOptions options) {
+        this.options = options;
+        this.listener = listener;
+        this.remote = remoteLocation;
+    }
+
+    @Override
+    public void connect() throws IOException {
+
+        if (listener == null) {
+            throw new IllegalStateException("A transport listener must be set before connection attempts.");
+        }
+
+        group = new NioEventLoopGroup();
+
+        bootstrap = new Bootstrap();
+        bootstrap.group(group);
+        bootstrap.channel(NioSocketChannel.class);
+        bootstrap.handler(new ChannelInitializer<Channel>() {
+
+            @Override
+            public void initChannel(Channel connectedChannel) throws Exception {
+                channel = connectedChannel;
+                channel.pipeline().addLast(new NettyTcpTransportHandler());
+            }
+        });
+
+        configureNetty(bootstrap, getTransportOptions());
+
+        ChannelFuture future = bootstrap.connect(remote.getHost(), remote.getPort());
+        future.awaitUninterruptibly();
+
+        if (future.isCancelled()) {
+            throw new IOException("Connection attempt was cancelled");
+        } else if (!future.isSuccess()) {
+            throw IOExceptionSupport.create(future.cause());
+        } else {
+            connected.set(true);
+        }
+    }
+
+    @Override
+    public boolean isConnected() {
+        return connected.get();
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (closed.compareAndSet(false, true)) {
+            channel.close();
+            group.shutdownGracefully();
+        }
+    }
+
+    @Override
+    public void send(ByteBuffer output) throws IOException {
+        send(Unpooled.wrappedBuffer(output));
+    }
+
+    @Override
+    public void send(ByteBuf output) throws IOException {
+        channel.write(output);
+        channel.flush();
+    }
+
+    @Override
+    public TransportListener getTransportListener() {
+        return listener;
+    }
+
+    @Override
+    public void setTransportListener(TransportListener listener) {
+        this.listener = listener;
+    }
+
+    public TransportOptions getTransportOptions() {
+        if (options == null) {
+            options = TransportOptions.DEFAULT_OPTIONS;
+        }
+
+        return options;
+    }
+
+    //----- Internal implementation details ----------------------------------//
+
+    protected void configureNetty(Bootstrap bootstrap, TransportOptions options) {
+        bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
+        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
+        bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
+        bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger());
+        bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
+
+        if (options.getSendBufferSize() != -1) {
+            bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize());
+        }
+
+        if (options.getReceiveBufferSize() != -1) {
+            bootstrap.option(ChannelOption.SO_RCVBUF, options.getSendBufferSize());
+            bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getSendBufferSize()));
+        }
+
+        if (options.getTrafficClass() != -1) {
+            bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass());
+        }
+    }
+
+    //----- Handle connection events -----------------------------------------//
+
+    private class NettyTcpTransportHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
+        @Override
+        public void channelActive(ChannelHandlerContext context) throws Exception {
+            LOG.info("Channel has become active! Channel is {}", context.channel());
+        }
+
+        @Override
+        public void channelInactive(ChannelHandlerContext context) throws Exception {
+            LOG.info("Channel has gone inactive! Channel is {}", context.channel());
+            if (!closed.get()) {
+                connected.set(false);
+                listener.onTransportClosed();
+            }
+        }
+
+        @Override
+        public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
+            LOG.info("Exception on channel! Channel is {}", context.channel());
+            if (!closed.get()) {
+                connected.set(false);
+                listener.onTransportError(cause);
+            }
+        }
+
+        @Override
+        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
+            LOG.info("New data read: {} bytes incoming", buffer.readableBytes());
+            listener.onData(buffer);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportFactory.java
new file mode 100644
index 0000000..d51013c
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportFactory.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.transports.Transport;
+import org.apache.qpid.jms.transports.TransportFactory;
+import org.apache.qpid.jms.util.PropertyUtil;
+
+/**
+ * Factory for creating the Netty based TCP Transport.
+ */
+public class NettyTcpTransportFactory extends TransportFactory {
+
+    @Override
+    public Transport createTransport(URI remoteURI) throws Exception {
+
+        Map<String, String> map = PropertyUtil.parseQuery(remoteURI.getQuery());
+        Map<String, String> transportURIOptions = PropertyUtil.filterProperties(map, "transport.");
+
+        remoteURI = PropertyUtil.replaceQuery(remoteURI, map);
+
+        TransportOptions transportOptions = new TransportOptions();
+
+        if (!PropertyUtil.setProperties(transportOptions, transportURIOptions)) {
+            String msg = " Not all transport options could be set on the Transport." +
+                         " Check the options are spelled correctly." +
+                         " Given parameters=[" + transportURIOptions + "]." +
+                         " This provider instance cannot be started.";
+            throw new IllegalArgumentException(msg);
+        }
+
+        Transport result = doCreateTransport(remoteURI, transportOptions);
+
+        return result;
+    }
+
+    protected NettyTcpTransport doCreateTransport(URI remoteURI, TransportOptions transportOptions) throws Exception {
+        return new NettyTcpTransport(remoteURI, transportOptions);
+    }
+
+    @Override
+    public String getName() {
+        return "TCP";
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org