You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2015/01/08 22:44:40 UTC
[1/2] qpid-jms git commit: Heavy refactoring of the Transport layer
in the client.
Repository: qpid-jms
Updated Branches:
refs/heads/master 24bd2a919 -> abde5ef20
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransport.java
new file mode 100644
index 0000000..bb996f5
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransport.java
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.plain;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.net.SocketFactory;
+
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.transports.Transport;
+import org.apache.qpid.jms.transports.TransportListener;
+import org.apache.qpid.jms.util.IOExceptionSupport;
+import org.apache.qpid.jms.util.InetAddressUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class PlainTcpTransport implements Transport, Runnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PlainTcpTransport.class);
+
+ private TransportListener listener;
+ private final URI remoteLocation;
+ private final AtomicBoolean connected = new AtomicBoolean();
+ private final AtomicBoolean closed = new AtomicBoolean();
+ private final AtomicReference<Throwable> connectionError = new AtomicReference<Throwable>();
+
+ private final Socket socket;
+ private DataOutputStream dataOut;
+ private DataInputStream dataIn;
+ private Thread runner;
+
+ private TransportOptions options;
+
+ private boolean closeAsync = true;
+ private boolean useLocalHost = false;
+ private int ioBufferSize = 8 * 1024;
+
+ /**
+ * Create a new transport instance
+ *
+ * @param remoteLocation
+ * the URI that defines the remote resource to connect to.
+ * @param options
+ * the transport options used to configure the socket connection.
+ */
+ public PlainTcpTransport(URI remoteLocation, TransportOptions options) {
+ this(null, remoteLocation, options);
+ }
+
+ /**
+ * Create a new transport instance
+ *
+ * @param listener
+ * the TransportListener that will receive events from this Transport.
+ * @param remoteLocation
+ * the URI that defines the remote resource to connect to.
+ * @param options
+ * the transport options used to configure the socket connection.
+ */
+ public PlainTcpTransport(TransportListener listener, URI remoteLocation, TransportOptions options) {
+ this.options = options;
+ this.listener = listener;
+ this.remoteLocation = remoteLocation;
+
+ Socket temp = null;
+ try {
+ temp = createSocketFactory().createSocket();
+ } catch (IOException e) {
+ connectionError.set(e);
+ }
+
+ this.socket = temp;
+ }
+
+ @Override
+ public void connect() throws IOException {
+ if (connectionError.get() != null) {
+ throw IOExceptionSupport.create(connectionError.get());
+ }
+
+ if (socket == null) {
+ throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
+ }
+
+ InetSocketAddress remoteAddress = null;
+
+ if (remoteLocation != null) {
+ String host = resolveHostName(remoteLocation.getHost());
+ remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
+ }
+
+ socket.connect(remoteAddress);
+
+ connected.set(true);
+
+ initialiseSocket(socket);
+ initializeStreams();
+
+ runner = new Thread(null, this, "QpidJMS " + getClass().getSimpleName() + ": " + toString());
+ runner.setDaemon(false);
+ runner.start();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed.compareAndSet(false, true)) {
+ if (socket == null) {
+ return;
+ }
+
+ // Closing the streams flush the sockets before closing.. if the socket
+ // is hung.. then this hangs the close so we support an asynchronous close
+ // by default which will timeout if the close doesn't happen after a delay.
+ if (closeAsync) {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ final ExecutorService closer = Executors.newSingleThreadExecutor();
+ closer.execute(new Runnable() {
+ @Override
+ public void run() {
+ LOG.trace("Closing socket {}", socket);
+ try {
+ socket.close();
+ LOG.debug("Closed socket {}", socket);
+ } catch (IOException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Caught exception closing socket " + socket + ". This exception will be ignored.", e);
+ }
+ } finally {
+ latch.countDown();
+ }
+ }
+ });
+
+ try {
+ latch.await(1,TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ closer.shutdownNow();
+ }
+ } else {
+ LOG.trace("Closing socket {}", socket);
+ try {
+ socket.close();
+ LOG.debug("Closed socket {}", socket);
+ } catch (IOException e) {
+ LOG.debug("Caught exception closing socket {}. This exception will be ignored.", socket, e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void send(ByteBuffer output) throws IOException {
+ checkConnected();
+ LOG.info("RawTcpTransport sending packet of size: {}", output.remaining());
+ WritableByteChannel channel = Channels.newChannel(dataOut);
+ channel.write(output);
+ dataOut.flush();
+ }
+
+ @Override
+ public void send(ByteBuf output) throws IOException {
+ checkConnected();
+ send(output.nioBuffer());
+ }
+
+ @Override
+ public boolean isConnected() {
+ return this.connected.get();
+ }
+
+ @Override
+ public TransportListener getTransportListener() {
+ return this.listener;
+ }
+
+ @Override
+ public void setTransportListener(TransportListener listener) {
+ if (listener == null) {
+ throw new IllegalArgumentException("Listener cannot be set to null");
+ }
+
+ this.listener = listener;
+ }
+
+ public TransportOptions getTransportOptions() {
+ if (options == null) {
+ options = TransportOptions.DEFAULT_OPTIONS;
+ }
+
+ return options;
+ }
+
+ public boolean isUseLocalHost() {
+ return useLocalHost;
+ }
+
+ public void setUseLocalHost(boolean useLocalHost) {
+ this.useLocalHost = useLocalHost;
+ }
+
+ public int getIoBufferSize() {
+ return ioBufferSize;
+ }
+
+ public void setIoBufferSize(int ioBufferSize) {
+ this.ioBufferSize = ioBufferSize;
+ }
+
+ public boolean isCloseAsync() {
+ return closeAsync;
+ }
+
+ public void setCloseAsync(boolean closeAsync) {
+ this.closeAsync = closeAsync;
+ }
+
+ //---------- Transport internal implementation ---------------------------//
+
+ @Override
+ public void run() {
+ LOG.trace("TCP consumer thread for " + this + " starting");
+ try {
+ while (isConnected()) {
+ doRun();
+ }
+ } catch (IOException e) {
+ connectionError.set(e);
+ onException(e);
+ } catch (Throwable e) {
+ IOException ioe = new IOException("Unexpected error occured: " + e);
+ connectionError.set(ioe);
+ ioe.initCause(e);
+ onException(ioe);
+ }
+ }
+
+ protected void doRun() throws IOException {
+ int size = dataIn.available();
+ if (size <= 0) {
+ try {
+ TimeUnit.NANOSECONDS.sleep(1);
+ } catch (InterruptedException e) {
+ }
+ return;
+ }
+
+ byte[] buffer = new byte[size];
+ dataIn.readFully(buffer);
+ listener.onData(Unpooled.wrappedBuffer(buffer));
+ }
+
+ /**
+ * Passes any IO exceptions into the transport listener
+ */
+ public void onException(IOException e) {
+ if (listener != null) {
+ try {
+ listener.onTransportError(e);
+ } catch (RuntimeException e2) {
+ LOG.debug("Unexpected runtime exception: " + e2, e2);
+ }
+ }
+ }
+
+ protected SocketFactory createSocketFactory() throws IOException {
+ return SocketFactory.getDefault();
+ }
+
+ protected void initialiseSocket(Socket sock) throws SocketException, IllegalArgumentException {
+ TransportOptions options = getTransportOptions();
+
+ try {
+ sock.setReceiveBufferSize(options.getReceiveBufferSize());
+ } catch (SocketException se) {
+ LOG.warn("Cannot set socket receive buffer size = {}", options.getReceiveBufferSize());
+ LOG.debug("Cannot set socket receive buffer size. Reason: {}. This exception is ignored.", se.getMessage(), se);
+ }
+
+ try {
+ sock.setSendBufferSize(options.getSendBufferSize());
+ } catch (SocketException se) {
+ LOG.warn("Cannot set socket send buffer size = {}", options.getSendBufferSize());
+ LOG.debug("Cannot set socket send buffer size. Reason: {}. This exception is ignored.", se.getMessage(), se);
+ }
+
+ sock.setSoTimeout(options.getSoTimeout());
+ sock.setKeepAlive(options.isTcpKeepAlive());
+ sock.setTcpNoDelay(options.isTcpNoDelay());
+
+ if (options.getSoLinger() > 0) {
+ sock.setSoLinger(true, options.getSoLinger());
+ } else {
+ sock.setSoLinger(false, 0);
+ }
+ }
+
+ protected void initializeStreams() throws IOException {
+ try {
+ TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize);
+ this.dataIn = new DataInputStream(buffIn);
+ TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
+ this.dataOut = new DataOutputStream(outputStream);
+ } catch (Throwable e) {
+ throw IOExceptionSupport.create(e);
+ }
+ }
+
+ protected String resolveHostName(String host) throws UnknownHostException {
+ if (isUseLocalHost()) {
+ String localName = InetAddressUtil.getLocalHostName();
+ if (localName != null && localName.equals(host)) {
+ return "localhost";
+ }
+ }
+ return host;
+ }
+
+ private void checkConnected() throws IOException {
+ if (!connected.get()) {
+ throw new IOException("Cannot send to a non-connected transport.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransportFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransportFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransportFactory.java
new file mode 100644
index 0000000..3894408
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransportFactory.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.plain;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.transports.Transport;
+import org.apache.qpid.jms.transports.TransportFactory;
+import org.apache.qpid.jms.util.PropertyUtil;
+
+/**
+ * Factory for creating the Plain TCP transport.
+ */
+public class PlainTcpTransportFactory extends TransportFactory {
+
+ @Override
+ public Transport createTransport(URI remoteURI) throws Exception {
+
+ Map<String, String> map = PropertyUtil.parseQuery(remoteURI.getQuery());
+ Map<String, String> transportURIOptions = PropertyUtil.filterProperties(map, "transport.");
+
+ remoteURI = PropertyUtil.replaceQuery(remoteURI, map);
+
+ TransportOptions transportOptions = new TransportOptions();
+
+ if (!PropertyUtil.setProperties(transportOptions, transportURIOptions)) {
+ String msg = " Not all transport options could be set on the Transport." +
+ " Check the options are spelled correctly." +
+ " Given parameters=[" + transportURIOptions + "]." +
+ " This provider instance cannot be started.";
+ throw new IllegalArgumentException(msg);
+ }
+
+ Transport result = doCreateTransport(remoteURI, transportOptions);
+
+ return result;
+ }
+
+ protected PlainTcpTransport doCreateTransport(URI remoteURI, TransportOptions transportOptions) throws Exception {
+ return new PlainTcpTransport(remoteURI, transportOptions);
+ }
+
+ @Override
+ public String getName() {
+ return "TCP";
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/TcpBufferedInputStream.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/TcpBufferedInputStream.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/TcpBufferedInputStream.java
new file mode 100644
index 0000000..ee4bf5c
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/TcpBufferedInputStream.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.plain;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * An optimized buffered input stream for Tcp
+ */
+public class TcpBufferedInputStream extends FilterInputStream {
+
+ private static final int DEFAULT_BUFFER_SIZE = 8192;
+ protected byte internalBuffer[];
+ protected int count;
+ protected int position;
+
+ public TcpBufferedInputStream(InputStream in) {
+ this(in, DEFAULT_BUFFER_SIZE);
+ }
+
+ public TcpBufferedInputStream(InputStream in, int size) {
+ super(in);
+ if (size <= 0) {
+ throw new IllegalArgumentException("Buffer size <= 0");
+ }
+ internalBuffer = new byte[size];
+ }
+
+ protected void fill() throws IOException {
+ byte[] buffer = internalBuffer;
+ count = 0;
+ position = 0;
+ int n = in.read(buffer, position, buffer.length - position);
+ if (n > 0) {
+ count = n + position;
+ }
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (position >= count) {
+ fill();
+ if (position >= count) {
+ return -1;
+ }
+ }
+ return internalBuffer[position++] & 0xff;
+ }
+
+ private int readStream(byte[] b, int off, int len) throws IOException {
+ int avail = count - position;
+ if (avail <= 0) {
+ if (len >= internalBuffer.length) {
+ return in.read(b, off, len);
+ }
+ fill();
+ avail = count - position;
+ if (avail <= 0) {
+ return -1;
+ }
+ }
+ int cnt = (avail < len) ? avail : len;
+ System.arraycopy(internalBuffer, position, b, off, cnt);
+ position += cnt;
+ return cnt;
+ }
+
+ @Override
+ public int read(byte b[], int off, int len) throws IOException {
+ if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
+ throw new IndexOutOfBoundsException();
+ } else if (len == 0) {
+ return 0;
+ }
+ int n = 0;
+ for (;;) {
+ int nread = readStream(b, off + n, len - n);
+ if (nread <= 0) {
+ return (n == 0) ? nread : n;
+ }
+ n += nread;
+ if (n >= len) {
+ return n;
+ }
+ // if not closed but no bytes available, return
+ InputStream input = in;
+ if (input != null && input.available() <= 0) {
+ return n;
+ }
+ }
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ if (n <= 0) {
+ return 0;
+ }
+ long avail = count - position;
+ if (avail <= 0) {
+ return in.skip(n);
+ }
+ long skipped = (avail < n) ? avail : n;
+ position += skipped;
+ return skipped;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return in.available() + (count - position);
+ }
+
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (in != null) {
+ in.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/TcpBufferedOutputStream.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/TcpBufferedOutputStream.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/TcpBufferedOutputStream.java
new file mode 100644
index 0000000..84359ae
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/TcpBufferedOutputStream.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.plain;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * An optimized buffered outputstream for Tcp
+ */
+public class TcpBufferedOutputStream extends FilterOutputStream {
+
+ private static final int BUFFER_SIZE = 8192;
+ private final byte[] buffer;
+ private final int bufferlen;
+ private int count;
+
+ /**
+ * Constructor
+ *
+ * @param out
+ */
+ public TcpBufferedOutputStream(OutputStream out) {
+ this(out, BUFFER_SIZE);
+ }
+
+ /**
+ * Creates a new buffered output stream to write data to the specified underlying output
+ * stream with the specified buffer size.
+ *
+ * @param out
+ * the underlying output stream.
+ * @param size
+ * the buffer size.
+ * @throws IllegalArgumentException
+ * if size <= 0.
+ */
+ public TcpBufferedOutputStream(OutputStream out, int size) {
+ super(out);
+ if (size <= 0) {
+ throw new IllegalArgumentException("Buffer size <= 0");
+ }
+ buffer = new byte[size];
+ bufferlen = size;
+ }
+
+ /**
+ * write a byte on to the stream
+ *
+ * @param b
+ * - byte to write
+ * @throws IOException
+ */
+ @Override
+ public void write(int b) throws IOException {
+ if ((bufferlen - count) < 1) {
+ flush();
+ }
+ buffer[count++] = (byte) b;
+ }
+
+ /**
+ * write a byte array to the stream
+ *
+ * @param b
+ * the byte buffer
+ * @param off
+ * the offset into the buffer
+ * @param len
+ * the length of data to write
+ * @throws IOException
+ */
+ @Override
+ public void write(byte b[], int off, int len) throws IOException {
+ if (b != null) {
+ if ((bufferlen - count) < len) {
+ flush();
+ }
+ if (buffer.length >= len) {
+ System.arraycopy(b, off, buffer, count, len);
+ count += len;
+ } else {
+ out.write(b, off, len);
+ }
+ }
+ }
+
+ /**
+ * flush the data to the output stream This doesn't call flush on the underlying
+ * outputstream, because Tcp is particularly efficent at doing this itself ....
+ *
+ * @throws IOException
+ */
+ @Override
+ public void flush() throws IOException {
+ if (count > 0 && out != null) {
+ out.write(buffer, 0, count);
+ count = 0;
+ }
+ }
+
+ /**
+ * close this stream
+ *
+ * @throws IOException
+ */
+ @Override
+ public void close() throws IOException {
+ super.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/SslTransport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/SslTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/SslTransport.java
new file mode 100644
index 0000000..fd312f8
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/SslTransport.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.vertx;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.qpid.jms.JmsSslContext;
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.transports.TransportListener;
+import org.vertx.java.core.net.NetClient;
+
+/**
+ * Provides SSL configuration to the Vert.x NetClient object used by the underling
+ * TCP based Transport.
+ */
+public class SslTransport extends TcpTransport {
+
+ // TODO - remove with SSL configuration placed in Transport options.
+ private JmsSslContext context;
+
+ /**
+ * Create a new transport instance
+ *
+ * @param remoteLocation
+ * the URI that defines the remote resource to connect to.
+ * @param options
+ * the transport options used to configure the socket connection.
+ */
+ public SslTransport(URI remoteLocation, TransportOptions options) {
+ super(null, remoteLocation, options);
+ }
+
+ /**
+ * Create a new transport instance
+ *
+ * @param listener
+ * the TransportListener that will receive events from this Transport.
+ * @param remoteLocation
+ * the URI that defines the remote resource to connect to.
+ * @param options
+ * the transport options used to configure the socket connection.
+ */
+ public SslTransport(TransportListener listener, URI remoteLocation, TransportOptions options) {
+ super(listener, remoteLocation, options);
+ }
+
+ @Override
+ protected void configureNetClient(NetClient client, TransportOptions options) throws IOException {
+ super.configureNetClient(client, options);
+
+ client.setSSL(true);
+ client.setKeyStorePath(context.getKeyStoreLocation());
+ client.setKeyStorePassword(context.getKeyStorePassword());
+ client.setTrustStorePath(context.getTrustStoreLocation());
+ client.setTrustStorePassword(context.getTrustStorePassword());
+ }
+
+ /**
+ * @return the context
+ */
+ public JmsSslContext getContext() {
+ return context;
+ }
+
+ /**
+ * @param context the context to set
+ */
+ public void setContext(JmsSslContext context) {
+ this.context = context;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/SslTransportFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/SslTransportFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/SslTransportFactory.java
new file mode 100644
index 0000000..b7c738c
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/SslTransportFactory.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.vertx;
+
+import java.net.URI;
+
+import org.apache.qpid.jms.JmsSslContext;
+import org.apache.qpid.jms.transports.TransportOptions;
+
+/**
+ * Create an SslTransport instance.
+ */
+public class SslTransportFactory extends TcpTransportFactory {
+
+ @Override
+ protected TcpTransport doCreateTransport(URI remoteURI, TransportOptions transportOptions) throws Exception {
+ SslTransport transport = new SslTransport(remoteURI, transportOptions);
+
+ transport.setContext(JmsSslContext.getCurrentSslContext());
+
+ return transport;
+ }
+
+ @Override
+ public String getName() {
+ return "SSL";
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransport.java
new file mode 100644
index 0000000..e824ec4
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransport.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.vertx;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.transports.Transport;
+import org.apache.qpid.jms.transports.TransportListener;
+import org.apache.qpid.jms.util.IOExceptionSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.vertx.java.core.AsyncResult;
+import org.vertx.java.core.AsyncResultHandler;
+import org.vertx.java.core.Handler;
+import org.vertx.java.core.Vertx;
+import org.vertx.java.core.buffer.Buffer;
+import org.vertx.java.core.impl.DefaultVertxFactory;
+import org.vertx.java.core.net.NetClient;
+import org.vertx.java.core.net.NetSocket;
+
+/**
+ * Vertex based TCP transport for raw data packets.
+ */
+public class TcpTransport implements Transport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TcpTransport.class);
+
+ private final Vertx vertx;
+ private final NetClient client;
+ private final URI remoteLocation;
+ private final AtomicBoolean connected = new AtomicBoolean();
+ private final AtomicBoolean closed = new AtomicBoolean();
+ private final AtomicReference<Throwable> connectionError = new AtomicReference<Throwable>();
+
+ private NetSocket socket;
+ private TransportListener listener;
+ private TransportOptions options;
+
+ /**
+ * Create a new transport instance
+ *
+ * @param remoteLocation
+ * the URI that defines the remote resource to connect to.
+ * @param options
+ * the transport options used to configure the socket connection.
+ */
+ public TcpTransport(URI remoteLocation, TransportOptions options) {
+ this(null, remoteLocation, options);
+ }
+
+ /**
+ * Create a new transport instance
+ *
+ * @param listener
+ * the TransportListener that will receive events from this Transport.
+ * @param remoteLocation
+ * the URI that defines the remote resource to connect to.
+ * @param options
+ * the transport options used to configure the socket connection.
+ */
+ public TcpTransport(TransportListener listener, URI remoteLocation, TransportOptions options) {
+ this.options = options;
+ this.listener = listener;
+ this.remoteLocation = remoteLocation;
+
+ DefaultVertxFactory vertxFactory = new DefaultVertxFactory();
+ this.vertx = vertxFactory.createVertx();
+ this.client = vertx.createNetClient();
+ }
+
+ @Override
+ public void connect() throws IOException {
+ final CountDownLatch connectLatch = new CountDownLatch(1);
+
+ if (listener == null) {
+ throw new IllegalStateException("A transport listener must be set before connection attempts.");
+ }
+
+ configureNetClient(client, getTransportOptions());
+
+ try {
+ client.connect(remoteLocation.getPort(), remoteLocation.getHost(), new AsyncResultHandler<NetSocket>() {
+ @Override
+ public void handle(AsyncResult<NetSocket> asyncResult) {
+ if (asyncResult.succeeded()) {
+ socket = asyncResult.result();
+ LOG.info("We have connected! Socket is {}", socket);
+
+ connected.set(true);
+ connectLatch.countDown();
+
+ socket.dataHandler(new Handler<Buffer>() {
+ @Override
+ public void handle(Buffer event) {
+ listener.onData(event.getByteBuf());
+ }
+ });
+
+ socket.closeHandler(new Handler<Void>() {
+ @Override
+ public void handle(Void event) {
+ if (!closed.get()) {
+ connected.set(false);
+ listener.onTransportClosed();
+ }
+ }
+ });
+
+ socket.exceptionHandler(new Handler<Throwable>() {
+ @Override
+ public void handle(Throwable event) {
+ if (!closed.get()) {
+ connected.set(false);
+ listener.onTransportError(event);
+ }
+ }
+ });
+
+ } else {
+ connected.set(false);
+ connectionError.set(asyncResult.cause());
+ connectLatch.countDown();
+ }
+ }
+ });
+ } catch (Throwable reason) {
+ LOG.info("Failed to connect to target Broker: {}", reason);
+ throw IOExceptionSupport.create(reason);
+ }
+
+ try {
+ connectLatch.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ if (connectionError.get() != null) {
+ throw IOExceptionSupport.create(connectionError.get());
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed.compareAndSet(false, true)) {
+ if (connected.get()) {
+ socket.close();
+ connected.set(false);
+ }
+
+ vertx.stop();
+ }
+ }
+
+ @Override
+ public void send(ByteBuffer output) throws IOException {
+ checkConnected();
+ int length = output.remaining();
+ if (length == 0) {
+ return;
+ }
+
+ byte[] copy = new byte[length];
+ output.get(copy);
+ Buffer sendBuffer = new Buffer(copy);
+
+ vertx.eventBus().send(socket.writeHandlerID(), sendBuffer);
+ }
+
+ @Override
+ public void send(ByteBuf output) throws IOException {
+ checkConnected();
+ int length = output.readableBytes();
+ if (length == 0) {
+ return;
+ }
+
+ Buffer sendBuffer = new Buffer(output.copy());
+ vertx.eventBus().send(socket.writeHandlerID(), sendBuffer);
+ }
+
+ /**
+ * Allows a subclass to configure the NetClient beyond what this transport might do.
+ *
+ * @throws IOException if an error occurs.
+ */
+ protected void configureNetClient(NetClient client, TransportOptions options) throws IOException {
+ client.setSendBufferSize(options.getSendBufferSize());
+ client.setReceiveBufferSize(options.getReceiveBufferSize());
+ client.setSoLinger(options.getSoLinger());
+ client.setTCPKeepAlive(options.isTcpKeepAlive());
+ client.setTCPNoDelay(options.isTcpNoDelay());
+ if (options.getConnectTimeout() >= 0) {
+ client.setConnectTimeout(options.getConnectTimeout());
+ }
+ }
+
+ @Override
+ public boolean isConnected() {
+ return this.connected.get();
+ }
+
+ private void checkConnected() throws IOException {
+ if (!connected.get()) {
+ throw new IOException("Cannot send to a non-connected transport.");
+ }
+ }
+
+ @Override
+ public TransportListener getTransportListener() {
+ return this.listener;
+ }
+
+ @Override
+ public void setTransportListener(TransportListener listener) {
+ if (listener == null) {
+ throw new IllegalArgumentException("Listener cannot be set to null");
+ }
+
+ this.listener = listener;
+ }
+
+ /**
+ * @return the options used to configure the TCP socket.
+ */
+ public TransportOptions getTransportOptions() {
+ if (options == null) {
+ options = TransportOptions.DEFAULT_OPTIONS;
+ }
+
+ return options;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransportFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransportFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransportFactory.java
new file mode 100644
index 0000000..8385dff
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransportFactory.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.vertx;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.transports.Transport;
+import org.apache.qpid.jms.transports.TransportFactory;
+import org.apache.qpid.jms.util.PropertyUtil;
+
+/**
+ * Factory for creating the Vert.x based TCP Transport
+ */
+public class TcpTransportFactory extends TransportFactory {
+
+ @Override
+ public Transport createTransport(URI remoteURI) throws Exception {
+
+ Map<String, String> map = PropertyUtil.parseQuery(remoteURI.getQuery());
+ Map<String, String> transportURIOptions = PropertyUtil.filterProperties(map, "transport.");
+
+ remoteURI = PropertyUtil.replaceQuery(remoteURI, map);
+
+ TransportOptions transportOptions = new TransportOptions();
+
+ if (!PropertyUtil.setProperties(transportOptions, transportURIOptions)) {
+ String msg = " Not all transport options could be set on the Transport." +
+ " Check the options are spelled correctly." +
+ " Given parameters=[" + transportURIOptions + "]." +
+ " This provider instance cannot be started.";
+ throw new IllegalArgumentException(msg);
+ }
+
+ Transport result = doCreateTransport(remoteURI, transportOptions);
+
+ return result;
+ }
+
+ protected TcpTransport doCreateTransport(URI remoteURI, TransportOptions transportOptions) throws Exception {
+ return new TcpTransport(remoteURI, transportOptions);
+ }
+
+ @Override
+ public String getName() {
+ return "TCP";
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/transports/ssl
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/transports/ssl b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/transports/ssl
new file mode 100644
index 0000000..eed5a6b
--- /dev/null
+++ b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/transports/ssl
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.qpid.jms.transports.vertx.SslTransportFactory
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/transports/tcp
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/transports/tcp b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/transports/tcp
new file mode 100644
index 0000000..533bcd1
--- /dev/null
+++ b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/transports/tcp
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.qpid.jms.transports.vertx.TcpTransportFactory
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java
index f4c5918..84197a4 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java
@@ -29,9 +29,9 @@ import java.util.List;
import org.apache.qpid.jms.test.QpidJmsTestCase;
import org.apache.qpid.jms.test.Wait;
-import org.apache.qpid.jms.transports.NettyTcpTransport;
-import org.apache.qpid.jms.transports.TcpTransportOptions;
+import org.apache.qpid.jms.transports.TransportOptions;
import org.apache.qpid.jms.transports.TransportListener;
+import org.apache.qpid.jms.transports.netty.NettyTcpTransport;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,7 +48,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
private final List<ByteBuf> data = new ArrayList<ByteBuf>();
private final TransportListener testListener = new NettyTransportListener();
- private final TcpTransportOptions testOptions = new TcpTransportOptions();
+ private final TransportOptions testOptions = new TransportOptions();
@Test(timeout = 60 * 1000)
public void testConnectToServer() throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-jms git commit: Heavy refactoring of the Transport layer
in the client.
Posted by ta...@apache.org.
Heavy refactoring of the Transport layer in the client.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/abde5ef2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/abde5ef2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/abde5ef2
Branch: refs/heads/master
Commit: abde5ef20cc173f50d05afc812b93ad0a9cc892d
Parents: 24bd2a9
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Jan 8 16:44:07 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Jan 8 16:44:07 2015 -0500
----------------------------------------------------------------------
.../qpid/jms/provider/amqp/AmqpProvider.java | 53 ++-
.../qpid/jms/provider/amqp/AmqpSslProvider.java | 47 ---
.../provider/amqp/AmqpSslProviderFactory.java | 7 +-
.../qpid/jms/transports/NettyTcpTransport.java | 197 ----------
.../qpid/jms/transports/RawTcpTransport.java | 377 -------------------
.../qpid/jms/transports/SslTransport.java | 59 ---
.../jms/transports/TcpBufferedInputStream.java | 139 -------
.../jms/transports/TcpBufferedOutputStream.java | 126 -------
.../qpid/jms/transports/TcpTransport.java | 279 --------------
.../jms/transports/TcpTransportOptions.java | 153 --------
.../qpid/jms/transports/TransportFactory.java | 110 ++++++
.../qpid/jms/transports/TransportOptions.java | 155 ++++++++
.../jms/transports/netty/NettyTcpTransport.java | 224 +++++++++++
.../netty/NettyTcpTransportFactory.java | 63 ++++
.../jms/transports/plain/PlainTcpTransport.java | 362 ++++++++++++++++++
.../plain/PlainTcpTransportFactory.java | 63 ++++
.../plain/TcpBufferedInputStream.java | 139 +++++++
.../plain/TcpBufferedOutputStream.java | 126 +++++++
.../qpid/jms/transports/vertx/SslTransport.java | 86 +++++
.../transports/vertx/SslTransportFactory.java | 42 +++
.../qpid/jms/transports/vertx/TcpTransport.java | 254 +++++++++++++
.../transports/vertx/TcpTransportFactory.java | 63 ++++
.../services/org/apache/qpid/jms/transports/ssl | 17 +
.../services/org/apache/qpid/jms/transports/tcp | 17 +
.../jms/test/netty/NettyTcpTransportTest.java | 6 +-
25 files changed, 1753 insertions(+), 1411 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 98c788c..e68bc6e 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -45,10 +45,9 @@ import org.apache.qpid.jms.provider.AbstractProvider;
import org.apache.qpid.jms.provider.AsyncResult;
import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
import org.apache.qpid.jms.provider.ProviderFuture;
-import org.apache.qpid.jms.transports.TcpTransport;
+import org.apache.qpid.jms.transports.TransportFactory;
import org.apache.qpid.jms.transports.TransportListener;
import org.apache.qpid.jms.util.IOExceptionSupport;
-import org.apache.qpid.jms.util.PropertyUtil;
import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Event;
@@ -84,9 +83,11 @@ public class AmqpProvider extends AbstractProvider implements TransportListener
// NOTE: Limit default channel max to signed short range to deal with
// brokers that don't currently handle the unsigned range well.
private static final int DEFAULT_CHANNEL_MAX = 32767;
+ private static final String DEFAULT_TRANSPORT_KEY = "tcp";
private AmqpConnection connection;
private org.apache.qpid.jms.transports.Transport transport;
+ private String transportKey = DEFAULT_TRANSPORT_KEY;
private boolean traceFrames;
private boolean traceBytes;
private boolean presettleConsumers;
@@ -125,25 +126,12 @@ public class AmqpProvider extends AbstractProvider implements TransportListener
public void connect() throws IOException {
checkClosed();
- transport = createTransport(getRemoteURI());
-
- Map<String, String> map = null;
try {
- map = PropertyUtil.parseQuery(remoteURI.getQuery());
+ transport = TransportFactory.create(getTransportKey(), getRemoteURI());
} catch (Exception e) {
throw IOExceptionSupport.create(e);
}
- Map<String, String> providerOptions = PropertyUtil.filterProperties(map, "transport.");
-
- if (!PropertyUtil.setProperties(transport, providerOptions)) {
- String msg = ""
- + " Not all transport options could be set on the AMQP Provider transport."
- + " Check the options are spelled correctly."
- + " Given parameters=[" + providerOptions + "]."
- + " This provider instance cannot be started.";
- throw new IOException(msg);
- }
-
+ transport.setTransportListener(this);
transport.connect();
}
@@ -593,19 +581,6 @@ public class AmqpProvider extends AbstractProvider implements TransportListener
});
}
- /**
- * Provides an extension point for subclasses to insert other types of transports such
- * as SSL etc.
- *
- * @param remoteLocation
- * The remote location where the transport should attempt to connect.
- *
- * @return the newly created transport instance.
- */
- protected org.apache.qpid.jms.transports.Transport createTransport(URI remoteLocation) {
- return new TcpTransport(this, remoteLocation);
- }
-
private void updateTracer() {
if (isTraceFrames()) {
((TransportImpl) protonTransport).setProtocolTracer(new ProtocolTracer() {
@@ -881,4 +856,22 @@ public class AmqpProvider extends AbstractProvider implements TransportListener
public void setChannelMax(int channelMax) {
this.channelMax = channelMax;
}
+
+ /**
+ * @return the transportKey that will be used to create the network level connection.
+ */
+ public String getTransportKey() {
+ return transportKey;
+ }
+
+ /**
+ * Sets the transport key used to lookup a Transport instance when an attempt
+ * is made to connect to a remote peer.
+ *
+ * @param transportKey
+ * the tansportKey to used when looking up a Transport to use.
+ */
+ void setTransportKey(String transportKey) {
+ this.transportKey = transportKey;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProvider.java
deleted file mode 100644
index af7fe7f..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProvider.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.provider.amqp;
-
-import java.net.URI;
-import java.util.Map;
-
-import org.apache.qpid.jms.JmsSslContext;
-import org.apache.qpid.jms.transports.SslTransport;
-import org.apache.qpid.jms.transports.Transport;
-
-/**
- * AmqpProvider extension that enables SSL based transports.
- */
-public class AmqpSslProvider extends AmqpProvider {
-
- private final JmsSslContext sslContext;
-
- public AmqpSslProvider(URI remoteURI) {
- super(remoteURI);
- this.sslContext = JmsSslContext.getCurrentSslContext();
- }
-
- public AmqpSslProvider(URI remoteURI, Map<String, String> extraOptions) {
- super(remoteURI, extraOptions);
- this.sslContext = JmsSslContext.getCurrentSslContext();
- }
-
- @Override
- protected Transport createTransport(URI remoteLocation) {
- return new SslTransport(this, remoteLocation, sslContext);
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProviderFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProviderFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProviderFactory.java
index 01a4f85..a19af00 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProviderFactory.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProviderFactory.java
@@ -27,6 +27,11 @@ public class AmqpSslProviderFactory extends AmqpProviderFactory {
@Override
public Provider createProvider(URI remoteURI) throws Exception {
- return new AmqpSslProvider(remoteURI);
+ AmqpProvider provider = new AmqpProvider(remoteURI);
+ // TODO - Would be better if we could do away with this and define
+ // the transport key in the properties file used to find the
+ // AmqpProcvider instance.
+ provider.setTransportKey("ssl");
+ return provider;
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/NettyTcpTransport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/NettyTcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/NettyTcpTransport.java
deleted file mode 100644
index 49637c7..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/NettyTcpTransport.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.transports;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.FixedRecvByteBufAllocator;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.nio.NioSocketChannel;
-
-import java.io.IOException;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.qpid.jms.util.IOExceptionSupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.vertx.java.core.net.impl.PartialPooledByteBufAllocator;
-
-/**
- * TCP based transport that uses Netty as the underlying IO layer.
- */
-public class NettyTcpTransport implements Transport {
-
- private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class);
-
- private Bootstrap bootstrap;
- private EventLoopGroup group;
- private Channel channel;
- private TransportListener listener;
- private final TcpTransportOptions options;
- private final URI remote;
-
- private final AtomicBoolean connected = new AtomicBoolean();
- private final AtomicBoolean closed = new AtomicBoolean();
-
- /**
- * Create a new transport instance
- *
- * @param options
- * the transport options used to configure the socket connection.
- */
- public NettyTcpTransport(TransportListener listener, URI remoteLocation, TcpTransportOptions options) {
- this.options = options;
- this.listener = listener;
- this.remote = remoteLocation;
- }
-
- @Override
- public void connect() throws IOException {
-
- if (listener == null) {
- throw new IllegalStateException("A transport listener must be set before connection attempts.");
- }
-
- group = new NioEventLoopGroup();
-
- bootstrap = new Bootstrap();
- bootstrap.group(group);
- bootstrap.channel(NioSocketChannel.class);
- bootstrap.handler(new ChannelInitializer<Channel>() {
-
- @Override
- public void initChannel(Channel connectedChannel) throws Exception {
- channel = connectedChannel;
- channel.pipeline().addLast(new NettyTcpTransportHandler());
- }
- });
-
- configureNetty(bootstrap, options);
-
- ChannelFuture future = bootstrap.connect(remote.getHost(), remote.getPort());
- future.awaitUninterruptibly();
-
- if (future.isCancelled()) {
- throw new IOException("Connection attempt was cancelled");
- } else if (!future.isSuccess()) {
- throw IOExceptionSupport.create(future.cause());
- } else {
- connected.set(true);
- }
- }
-
- @Override
- public boolean isConnected() {
- return connected.get();
- }
-
- @Override
- public void close() throws IOException {
- if (closed.compareAndSet(false, true)) {
- channel.close();
- group.shutdownGracefully();
- }
- }
-
- @Override
- public void send(ByteBuffer output) throws IOException {
- send(Unpooled.wrappedBuffer(output));
- }
-
- @Override
- public void send(ByteBuf output) throws IOException {
- channel.write(output);
- channel.flush();
- }
-
- @Override
- public TransportListener getTransportListener() {
- return listener;
- }
-
- @Override
- public void setTransportListener(TransportListener listener) {
- this.listener = listener;
- }
-
- //----- Internal implementation details ----------------------------------//
-
- protected void configureNetty(Bootstrap bootstrap, TcpTransportOptions options) {
- bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
- bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
- bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
- bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger());
- bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
-
- if (options.getSendBufferSize() != -1) {
- bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize());
- }
-
- if (options.getReceiveBufferSize() != -1) {
- bootstrap.option(ChannelOption.SO_RCVBUF, options.getSendBufferSize());
- bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getSendBufferSize()));
- }
-
- if (options.getTrafficClass() != -1) {
- bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass());
- }
- }
-
- //----- Handle connection events -----------------------------------------//
-
- private class NettyTcpTransportHandler extends SimpleChannelInboundHandler<ByteBuf> {
-
- @Override
- public void channelActive(ChannelHandlerContext context) throws Exception {
- LOG.info("Channel has become active! Channel is {}", context.channel());
- }
-
- @Override
- public void channelInactive(ChannelHandlerContext context) throws Exception {
- LOG.info("Channel has gone inactive! Channel is {}", context.channel());
- if (!closed.get()) {
- connected.set(false);
- listener.onTransportClosed();
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
- LOG.info("Exception on channel! Channel is {}", context.channel());
- if (!closed.get()) {
- connected.set(false);
- listener.onTransportError(cause);
- }
- }
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
- LOG.info("New data read: {} bytes incoming", buffer.readableBytes());
- listener.onData(buffer);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/RawTcpTransport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/RawTcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/RawTcpTransport.java
deleted file mode 100644
index 210da61..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/RawTcpTransport.java
+++ /dev/null
@@ -1,377 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.transports;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketException;
-import java.net.URI;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.net.SocketFactory;
-
-import org.apache.qpid.jms.util.IOExceptionSupport;
-import org.apache.qpid.jms.util.InetAddressUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- */
-public class RawTcpTransport implements Transport, Runnable {
-
- private static final Logger LOG = LoggerFactory.getLogger(RawTcpTransport.class);
-
- private TransportListener listener;
- private final URI remoteLocation;
- private final AtomicBoolean connected = new AtomicBoolean();
- private final AtomicBoolean closed = new AtomicBoolean();
- private final AtomicReference<Throwable> connectionError = new AtomicReference<Throwable>();
-
- private final Socket socket;
- private DataOutputStream dataOut;
- private DataInputStream dataIn;
- private Thread runner;
-
- private boolean closeAsync = true;
- private int socketBufferSize = 64 * 1024;
- private int soTimeout = 0;
- private int soLinger = Integer.MIN_VALUE;
- private Boolean keepAlive;
- private Boolean tcpNoDelay = true;
- private boolean useLocalHost = false;
- private int ioBufferSize = 8 * 1024;
-
- /**
- * Create a new instance of the transport.
- *
- * @param listener
- * The TransportListener that will receive data from this Transport instance.
- * @param remoteLocation
- * The remote location where this transport should connection to.
- */
- public RawTcpTransport(TransportListener listener, URI remoteLocation) {
- this.listener = listener;
- this.remoteLocation = remoteLocation;
-
- Socket temp = null;
- try {
- temp = createSocketFactory().createSocket();
- } catch (IOException e) {
- connectionError.set(e);
- }
-
- this.socket = temp;
- }
-
- @Override
- public void connect() throws IOException {
- if (connectionError.get() != null) {
- throw IOExceptionSupport.create(connectionError.get());
- }
-
- if (socket == null) {
- throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
- }
-
- InetSocketAddress remoteAddress = null;
-
- if (remoteLocation != null) {
- String host = resolveHostName(remoteLocation.getHost());
- remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
- }
-
- socket.connect(remoteAddress);
-
- connected.set(true);
-
- initialiseSocket(socket);
- initializeStreams();
-
- runner = new Thread(null, this, "QpidJMS RawTcpTransport: " + toString());
- runner.setDaemon(false);
- runner.start();
- }
-
- @Override
- public void close() throws IOException {
- if (closed.compareAndSet(false, true)) {
- if (socket == null) {
- return;
- }
-
- // Closing the streams flush the sockets before closing.. if the socket
- // is hung.. then this hangs the close so we support an asynchronous close
- // by default which will timeout if the close doesn't happen after a delay.
- if (closeAsync) {
- final CountDownLatch latch = new CountDownLatch(1);
-
- final ExecutorService closer = Executors.newSingleThreadExecutor();
- closer.execute(new Runnable() {
- @Override
- public void run() {
- LOG.trace("Closing socket {}", socket);
- try {
- socket.close();
- LOG.debug("Closed socket {}", socket);
- } catch (IOException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Caught exception closing socket " + socket + ". This exception will be ignored.", e);
- }
- } finally {
- latch.countDown();
- }
- }
- });
-
- try {
- latch.await(1,TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } finally {
- closer.shutdownNow();
- }
- } else {
- LOG.trace("Closing socket {}", socket);
- try {
- socket.close();
- LOG.debug("Closed socket {}", socket);
- } catch (IOException e) {
- LOG.debug("Caught exception closing socket {}. This exception will be ignored.", socket, e);
- }
- }
- }
- }
-
- @Override
- public void send(ByteBuffer output) throws IOException {
- checkConnected();
- LOG.info("RawTcpTransport sending packet of size: {}", output.remaining());
- WritableByteChannel channel = Channels.newChannel(dataOut);
- channel.write(output);
- dataOut.flush();
- }
-
- @Override
- public void send(ByteBuf output) throws IOException {
- checkConnected();
- send(output.nioBuffer());
- }
-
- @Override
- public boolean isConnected() {
- return this.connected.get();
- }
-
- @Override
- public TransportListener getTransportListener() {
- return this.listener;
- }
-
- @Override
- public void setTransportListener(TransportListener listener) {
- if (listener == null) {
- throw new IllegalArgumentException("Listener cannot be set to null");
- }
-
- this.listener = listener;
- }
-
- public int getSocketBufferSize() {
- return socketBufferSize;
- }
-
- public void setSocketBufferSize(int socketBufferSize) {
- this.socketBufferSize = socketBufferSize;
- }
-
- public int getSoTimeout() {
- return soTimeout;
- }
-
- public void setSoTimeout(int soTimeout) {
- this.soTimeout = soTimeout;
- }
-
- public boolean isTcpNoDelay() {
- return tcpNoDelay;
- }
-
- public void setTcpNoDelay(Boolean tcpNoDelay) {
- this.tcpNoDelay = tcpNoDelay;
- }
-
- public int getSoLinger() {
- return soLinger;
- }
-
- public void setSoLinger(int soLinger) {
- this.soLinger = soLinger;
- }
-
- public boolean isKeepAlive() {
- return keepAlive;
- }
-
- public void setKeepAlive(Boolean keepAlive) {
- this.keepAlive = keepAlive;
- }
-
- public boolean isUseLocalHost() {
- return useLocalHost;
- }
-
- public void setUseLocalHost(boolean useLocalHost) {
- this.useLocalHost = useLocalHost;
- }
-
- public int getIoBufferSize() {
- return ioBufferSize;
- }
-
- public void setIoBufferSize(int ioBufferSize) {
- this.ioBufferSize = ioBufferSize;
- }
-
- public boolean isCloseAsync() {
- return closeAsync;
- }
-
- public void setCloseAsync(boolean closeAsync) {
- this.closeAsync = closeAsync;
- }
-
- //---------- Transport internal implementation ---------------------------//
-
- @Override
- public void run() {
- LOG.trace("TCP consumer thread for " + this + " starting");
- try {
- while (isConnected()) {
- doRun();
- }
- } catch (IOException e) {
- connectionError.set(e);
- onException(e);
- } catch (Throwable e) {
- IOException ioe = new IOException("Unexpected error occured: " + e);
- connectionError.set(ioe);
- ioe.initCause(e);
- onException(ioe);
- }
- }
-
- protected void doRun() throws IOException {
- int size = dataIn.available();
- if (size <= 0) {
- try {
- TimeUnit.NANOSECONDS.sleep(1);
- } catch (InterruptedException e) {
- }
- return;
- }
-
- byte[] buffer = new byte[size];
- dataIn.readFully(buffer);
- listener.onData(Unpooled.wrappedBuffer(buffer));
- }
-
- /**
- * Passes any IO exceptions into the transport listener
- */
- public void onException(IOException e) {
- if (listener != null) {
- try {
- listener.onTransportError(e);
- } catch (RuntimeException e2) {
- LOG.debug("Unexpected runtime exception: " + e2, e2);
- }
- }
- }
-
- protected SocketFactory createSocketFactory() throws IOException {
- return SocketFactory.getDefault();
- }
-
- protected void initialiseSocket(Socket sock) throws SocketException, IllegalArgumentException {
- try {
- sock.setReceiveBufferSize(socketBufferSize);
- sock.setSendBufferSize(socketBufferSize);
- } catch (SocketException se) {
- LOG.warn("Cannot set socket buffer size = {}", socketBufferSize);
- LOG.debug("Cannot set socket buffer size. Reason: {}. This exception is ignored.", se.getMessage(), se);
- }
-
- sock.setSoTimeout(soTimeout);
-
- if (keepAlive != null) {
- sock.setKeepAlive(keepAlive.booleanValue());
- }
-
- if (soLinger > -1) {
- sock.setSoLinger(true, soLinger);
- } else if (soLinger == -1) {
- sock.setSoLinger(false, 0);
- }
-
- if (tcpNoDelay != null) {
- sock.setTcpNoDelay(tcpNoDelay.booleanValue());
- }
- }
-
- protected void initializeStreams() throws IOException {
- try {
- TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize);
- this.dataIn = new DataInputStream(buffIn);
- TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
- this.dataOut = new DataOutputStream(outputStream);
- } catch (Throwable e) {
- throw IOExceptionSupport.create(e);
- }
- }
-
- protected String resolveHostName(String host) throws UnknownHostException {
- if (isUseLocalHost()) {
- String localName = InetAddressUtil.getLocalHostName();
- if (localName != null && localName.equals(host)) {
- return "localhost";
- }
- }
- return host;
- }
-
- private void checkConnected() throws IOException {
- if (!connected.get()) {
- throw new IOException("Cannot send to a non-connected transport.");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/SslTransport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/SslTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/SslTransport.java
deleted file mode 100644
index 0860aae..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/SslTransport.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.transports;
-
-import java.io.IOException;
-import java.net.URI;
-
-import org.apache.qpid.jms.JmsSslContext;
-import org.vertx.java.core.net.NetClient;
-
-/**
- * Provides SSL configuration to the Vert.x NetClient object used by the underling
- * TCP based Transport.
- */
-public class SslTransport extends TcpTransport {
-
- private final JmsSslContext context;
-
- /**
- * Create an instance of the SSL transport
- *
- * @param listener
- * The TransportListener that will handle events from this Transport instance.
- * @param remoteLocation
- * The location that is being connected to.
- * @param context
- * The JMS Framework SslContext to use for this SSL connection.
- */
- public SslTransport(TransportListener listener, URI remoteLocation, JmsSslContext context) {
- super(listener, remoteLocation);
-
- this.context = context;
- }
-
- @Override
- protected void configureNetClient(NetClient client) throws IOException {
- super.configureNetClient(client);
-
- client.setSSL(true);
- client.setKeyStorePath(context.getKeyStoreLocation());
- client.setKeyStorePassword(context.getKeyStorePassword());
- client.setTrustStorePath(context.getTrustStoreLocation());
- client.setTrustStorePassword(context.getTrustStorePassword());
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedInputStream.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedInputStream.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedInputStream.java
deleted file mode 100644
index c7ba887..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedInputStream.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.transports;
-
-import java.io.FilterInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * An optimized buffered input stream for Tcp
- */
-public class TcpBufferedInputStream extends FilterInputStream {
-
- private static final int DEFAULT_BUFFER_SIZE = 8192;
- protected byte internalBuffer[];
- protected int count;
- protected int position;
-
- public TcpBufferedInputStream(InputStream in) {
- this(in, DEFAULT_BUFFER_SIZE);
- }
-
- public TcpBufferedInputStream(InputStream in, int size) {
- super(in);
- if (size <= 0) {
- throw new IllegalArgumentException("Buffer size <= 0");
- }
- internalBuffer = new byte[size];
- }
-
- protected void fill() throws IOException {
- byte[] buffer = internalBuffer;
- count = 0;
- position = 0;
- int n = in.read(buffer, position, buffer.length - position);
- if (n > 0) {
- count = n + position;
- }
- }
-
- @Override
- public int read() throws IOException {
- if (position >= count) {
- fill();
- if (position >= count) {
- return -1;
- }
- }
- return internalBuffer[position++] & 0xff;
- }
-
- private int readStream(byte[] b, int off, int len) throws IOException {
- int avail = count - position;
- if (avail <= 0) {
- if (len >= internalBuffer.length) {
- return in.read(b, off, len);
- }
- fill();
- avail = count - position;
- if (avail <= 0) {
- return -1;
- }
- }
- int cnt = (avail < len) ? avail : len;
- System.arraycopy(internalBuffer, position, b, off, cnt);
- position += cnt;
- return cnt;
- }
-
- @Override
- public int read(byte b[], int off, int len) throws IOException {
- if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
- throw new IndexOutOfBoundsException();
- } else if (len == 0) {
- return 0;
- }
- int n = 0;
- for (;;) {
- int nread = readStream(b, off + n, len - n);
- if (nread <= 0) {
- return (n == 0) ? nread : n;
- }
- n += nread;
- if (n >= len) {
- return n;
- }
- // if not closed but no bytes available, return
- InputStream input = in;
- if (input != null && input.available() <= 0) {
- return n;
- }
- }
- }
-
- @Override
- public long skip(long n) throws IOException {
- if (n <= 0) {
- return 0;
- }
- long avail = count - position;
- if (avail <= 0) {
- return in.skip(n);
- }
- long skipped = (avail < n) ? avail : n;
- position += skipped;
- return skipped;
- }
-
- @Override
- public int available() throws IOException {
- return in.available() + (count - position);
- }
-
- @Override
- public boolean markSupported() {
- return false;
- }
-
- @Override
- public void close() throws IOException {
- if (in != null) {
- in.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedOutputStream.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedOutputStream.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedOutputStream.java
deleted file mode 100644
index 82f8c41..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedOutputStream.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.transports;
-
-import java.io.FilterOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * An optimized buffered outputstream for Tcp
- */
-public class TcpBufferedOutputStream extends FilterOutputStream {
-
- private static final int BUFFER_SIZE = 8192;
- private final byte[] buffer;
- private final int bufferlen;
- private int count;
-
- /**
- * Constructor
- *
- * @param out
- */
- public TcpBufferedOutputStream(OutputStream out) {
- this(out, BUFFER_SIZE);
- }
-
- /**
- * Creates a new buffered output stream to write data to the specified underlying output
- * stream with the specified buffer size.
- *
- * @param out
- * the underlying output stream.
- * @param size
- * the buffer size.
- * @throws IllegalArgumentException
- * if size <= 0.
- */
- public TcpBufferedOutputStream(OutputStream out, int size) {
- super(out);
- if (size <= 0) {
- throw new IllegalArgumentException("Buffer size <= 0");
- }
- buffer = new byte[size];
- bufferlen = size;
- }
-
- /**
- * write a byte on to the stream
- *
- * @param b
- * - byte to write
- * @throws IOException
- */
- @Override
- public void write(int b) throws IOException {
- if ((bufferlen - count) < 1) {
- flush();
- }
- buffer[count++] = (byte) b;
- }
-
- /**
- * write a byte array to the stream
- *
- * @param b
- * the byte buffer
- * @param off
- * the offset into the buffer
- * @param len
- * the length of data to write
- * @throws IOException
- */
- @Override
- public void write(byte b[], int off, int len) throws IOException {
- if (b != null) {
- if ((bufferlen - count) < len) {
- flush();
- }
- if (buffer.length >= len) {
- System.arraycopy(b, off, buffer, count, len);
- count += len;
- } else {
- out.write(b, off, len);
- }
- }
- }
-
- /**
- * flush the data to the output stream This doesn't call flush on the underlying
- * outputstream, because Tcp is particularly efficent at doing this itself ....
- *
- * @throws IOException
- */
- @Override
- public void flush() throws IOException {
- if (count > 0 && out != null) {
- out.write(buffer, 0, count);
- count = 0;
- }
- }
-
- /**
- * close this stream
- *
- * @throws IOException
- */
- @Override
- public void close() throws IOException {
- super.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransport.java
deleted file mode 100644
index bc2cada..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransport.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.transports;
-
-import io.netty.buffer.ByteBuf;
-
-import java.io.IOException;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.qpid.jms.util.IOExceptionSupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.vertx.java.core.AsyncResult;
-import org.vertx.java.core.AsyncResultHandler;
-import org.vertx.java.core.Handler;
-import org.vertx.java.core.Vertx;
-import org.vertx.java.core.buffer.Buffer;
-import org.vertx.java.core.impl.DefaultVertxFactory;
-import org.vertx.java.core.net.NetClient;
-import org.vertx.java.core.net.NetSocket;
-
-/**
- * Vertex based TCP transport for raw data packets.
- */
-public class TcpTransport implements Transport {
-
- private static final Logger LOG = LoggerFactory.getLogger(TcpTransport.class);
-
- private final Vertx vertx;
- private final NetClient client;
- private final URI remoteLocation;
- private final AtomicBoolean connected = new AtomicBoolean();
- private final AtomicBoolean closed = new AtomicBoolean();
- private final AtomicReference<Throwable> connectionError = new AtomicReference<Throwable>();
-
- private NetSocket socket;
-
- private TransportListener listener;
- private int socketBufferSize = 64 * 1024;
- private int soTimeout = -1;
- private int connectTimeout = -1;
- private int soLinger = Integer.MIN_VALUE;
- private boolean keepAlive;
- private boolean tcpNoDelay = true;
-
- /**
- * Create a new instance of the transport.
- *
- * @param listener
- * The TransportListener that will receive data from this Transport instance.
- * @param remoteLocation
- * The remote location where this transport should connection to.
- */
- public TcpTransport(TransportListener listener, URI remoteLocation) {
- this.listener = listener;
- this.remoteLocation = remoteLocation;
-
- DefaultVertxFactory vertxFactory = new DefaultVertxFactory();
- this.vertx = vertxFactory.createVertx();
- this.client = vertx.createNetClient();
- }
-
- @Override
- public void connect() throws IOException {
- final CountDownLatch connectLatch = new CountDownLatch(1);
-
- if (listener == null) {
- throw new IllegalStateException("A transport listener must be set before connection attempts.");
- }
-
- configureNetClient(client);
-
- try {
- client.connect(remoteLocation.getPort(), remoteLocation.getHost(), new AsyncResultHandler<NetSocket>() {
- @Override
- public void handle(AsyncResult<NetSocket> asyncResult) {
- if (asyncResult.succeeded()) {
- socket = asyncResult.result();
- LOG.info("We have connected! Socket is {}", socket);
-
- connected.set(true);
- connectLatch.countDown();
-
- socket.dataHandler(new Handler<Buffer>() {
- @Override
- public void handle(Buffer event) {
- listener.onData(event.getByteBuf());
- }
- });
-
- socket.closeHandler(new Handler<Void>() {
- @Override
- public void handle(Void event) {
- if (!closed.get()) {
- connected.set(false);
- listener.onTransportClosed();
- }
- }
- });
-
- socket.exceptionHandler(new Handler<Throwable>() {
- @Override
- public void handle(Throwable event) {
- if (!closed.get()) {
- connected.set(false);
- listener.onTransportError(event);
- }
- }
- });
-
- } else {
- connected.set(false);
- connectionError.set(asyncResult.cause());
- connectLatch.countDown();
- }
- }
- });
- } catch (Throwable reason) {
- LOG.info("Failed to connect to target Broker: {}", reason);
- throw IOExceptionSupport.create(reason);
- }
-
- try {
- connectLatch.await();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
-
- if (connectionError.get() != null) {
- throw IOExceptionSupport.create(connectionError.get());
- }
- }
-
- @Override
- public void close() throws IOException {
- if (closed.compareAndSet(false, true)) {
- if (connected.get()) {
- socket.close();
- connected.set(false);
- }
-
- vertx.stop();
- }
- }
-
- @Override
- public void send(ByteBuffer output) throws IOException {
- checkConnected();
- int length = output.remaining();
- if (length == 0) {
- return;
- }
-
- byte[] copy = new byte[length];
- output.get(copy);
- Buffer sendBuffer = new Buffer(copy);
-
- vertx.eventBus().send(socket.writeHandlerID(), sendBuffer);
- }
-
- @Override
- public void send(ByteBuf output) throws IOException {
- checkConnected();
- int length = output.readableBytes();
- if (length == 0) {
- return;
- }
-
- Buffer sendBuffer = new Buffer(output.copy());
- vertx.eventBus().send(socket.writeHandlerID(), sendBuffer);
- }
-
- /**
- * Allows a subclass to configure the NetClient beyond what this transport might do.
- *
- * @throws IOException if an error occurs.
- */
- protected void configureNetClient(NetClient client) throws IOException {
- client.setSendBufferSize(getSocketBufferSize());
- client.setReceiveBufferSize(getSocketBufferSize());
- client.setSoLinger(soLinger);
- client.setTCPKeepAlive(keepAlive);
- client.setTCPNoDelay(tcpNoDelay);
- if (connectTimeout >= 0) {
- client.setConnectTimeout(connectTimeout);
- }
- }
-
- @Override
- public boolean isConnected() {
- return this.connected.get();
- }
-
- private void checkConnected() throws IOException {
- if (!connected.get()) {
- throw new IOException("Cannot send to a non-connected transport.");
- }
- }
-
- @Override
- public TransportListener getTransportListener() {
- return this.listener;
- }
-
- @Override
- public void setTransportListener(TransportListener listener) {
- if (listener == null) {
- throw new IllegalArgumentException("Listener cannot be set to null");
- }
-
- this.listener = listener;
- }
-
- public int getSocketBufferSize() {
- return socketBufferSize;
- }
-
- public void setSocketBufferSize(int socketBufferSize) {
- this.socketBufferSize = socketBufferSize;
- }
-
- public int getSoTimeout() {
- return soTimeout;
- }
-
- public void setSoTimeout(int soTimeout) {
- this.soTimeout = soTimeout;
- }
-
- public boolean isTcpNoDelay() {
- return tcpNoDelay;
- }
-
- public void setTcpNoDelay(boolean tcpNoDelay) {
- this.tcpNoDelay = tcpNoDelay;
- }
-
- public int getSoLinger() {
- return soLinger;
- }
-
- public void setSoLinger(int soLinger) {
- this.soLinger = soLinger;
- }
-
- public boolean isKeepAlive() {
- return keepAlive;
- }
-
- public void setKeepAlive(boolean keepAlive) {
- this.keepAlive = keepAlive;
- }
-
- public int getConnectTimeout() {
- return connectTimeout;
- }
-
- public void setConnectTimeout(int connectTimeout) {
- this.connectTimeout = connectTimeout;
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransportOptions.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransportOptions.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransportOptions.java
deleted file mode 100644
index e5f90c3..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransportOptions.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.transports;
-
-/**
- * Encapsulates all the TCP Transport options in one configuration object.
- */
-public class TcpTransportOptions {
-
- public static final int DEFAULT_SEND_BUFFER_SIZE = 64 * 1024;
- public static final int DEFAULT_RECEIVE_BUFFER_SIZE = DEFAULT_SEND_BUFFER_SIZE;
- public static final int DEFAULT_TRAFFIC_CLASS = 0;
- public static final boolean DEFAULT_TCP_NO_DELAY = true;
- public static final boolean DEFAULT_TCP_KEEP_ALIVE = false;
- public static final int DEFAULT_SO_LINGER = Integer.MIN_VALUE;
- public static final int DEFAULT_SO_TIMEOUT = -1;
- public static final int DEFAULT_CONNECT_TIMEOUT = 60000;
-
- private int sendBufferSize = DEFAULT_SEND_BUFFER_SIZE;
- private int receiveBufferSize = DEFAULT_RECEIVE_BUFFER_SIZE;
- private int trafficClass = DEFAULT_TRAFFIC_CLASS;
- private int connectTimeout = DEFAULT_CONNECT_TIMEOUT;
- private int soTimeout = DEFAULT_SO_TIMEOUT;
- private int soLinger = DEFAULT_SO_LINGER;
- private boolean tcpKeepAlive = DEFAULT_TCP_KEEP_ALIVE;
- private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY;
-
- /**
- * @return the currently set send buffer size in bytes.
- */
- public int getSendBufferSize() {
- return sendBufferSize;
- }
-
- /**
- * Sets the send buffer size in bytes, the value must be greater than zero
- * or an {@link IllegalArgumentException} will be thrown.
- *
- * @param sendBufferSize
- * the new send buffer size for the TCP Transport.
- *
- * @throws IllegalArgumentException if the value given is not in the valid range.
- */
- public void setSendBufferSize(int sendBufferSize) {
- if (sendBufferSize <= 0) {
- throw new IllegalArgumentException("The send buffer size must be > 0");
- }
-
- this.sendBufferSize = sendBufferSize;
- }
-
- /**
- * @return the currently configured receive buffer size in bytes.
- */
- public int getReceiveBufferSize() {
- return receiveBufferSize;
- }
-
- /**
- * Sets the receive buffer size in bytes, the value must be greater than zero
- * or an {@link IllegalArgumentException} will be thrown.
- *
- * @param receiveBufferSize
- * the new receive buffer size for the TCP Transport.
- *
- * @throws IllegalArgumentException if the value given is not in the valid range.
- */
- public void setReceiveBufferSize(int receiveBufferSize) {
- if (receiveBufferSize <= 0) {
- throw new IllegalArgumentException("The send buffer size must be > 0");
- }
-
- this.receiveBufferSize = receiveBufferSize;
- }
-
- /**
- * @return the currently configured traffic class value.
- */
- public int getTrafficClass() {
- return trafficClass;
- }
-
- /**
- * Sets the traffic class value used by the TCP connection, valid
- * range is between 0 and 255.
- *
- * @param trafficClass
- * the new traffic class value.
- *
- * @throws IllegalArgumentException if the value given is not in the valid range.
- */
- public void setTrafficClass(int trafficClass) {
- if (trafficClass < 0 || trafficClass > 255) {
- throw new IllegalArgumentException("Traffic class must be in the range [0..255]");
- }
-
- this.trafficClass = trafficClass;
- }
-
- public int getSoTimeout() {
- return soTimeout;
- }
-
- public void setSoTimeout(int soTimeout) {
- this.soTimeout = soTimeout;
- }
-
- public boolean isTcpNoDelay() {
- return tcpNoDelay;
- }
-
- public void setTcpNoDelay(boolean tcpNoDelay) {
- this.tcpNoDelay = tcpNoDelay;
- }
-
- public int getSoLinger() {
- return soLinger;
- }
-
- public void setSoLinger(int soLinger) {
- this.soLinger = soLinger;
- }
-
- public boolean isTcpKeepAlive() {
- return tcpKeepAlive;
- }
-
- public void setTcpKeepAlive(boolean keepAlive) {
- this.tcpKeepAlive = keepAlive;
- }
-
- public int getConnectTimeout() {
- return connectTimeout;
- }
-
- public void setConnectTimeout(int connectTimeout) {
- this.connectTimeout = connectTimeout;
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportFactory.java
new file mode 100644
index 0000000..463adfb
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportFactory.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.qpid.jms.util.FactoryFinder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Interface that all Transport types must implement.
+ */
+public abstract class TransportFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TransportFactory.class);
+
+ private static final FactoryFinder<TransportFactory> TRANSPORT_FACTORY_FINDER =
+ new FactoryFinder<TransportFactory>(TransportFactory.class,
+ "META-INF/services/" + TransportFactory.class.getPackage().getName().replace(".", "/") + "/");
+
+ /**
+ * Creates an instance of the given Transport and configures it using the
+ * properties set on the given remote broker URI.
+ *
+ * @param remoteURI
+ * The URI used to connect to a remote Broker.
+ *
+ * @return a new Transport instance.
+ *
+ * @throws Exception if an error occurs while creating the Transport instance.
+ */
+ public abstract Transport createTransport(URI remoteURI) throws Exception;
+
+ /**
+ * @return the name of this Transport.
+ */
+ public abstract String getName();
+
+ /**
+ * Static create method that performs the TransportFactory search and handles the
+ * configuration and setup.
+ *
+ * @param transportKey
+ * The transport type name used to locate a TransportFactory.
+ * @param remoteURI
+ * the URI of the remote peer.
+ *
+ * @return a new Transport instance that is ready for use.
+ *
+ * @throws Exception if an error occurs while creating the Transport instance.
+ */
+ public static Transport create(String transportKey, URI remoteURI) throws Exception {
+ Transport result = null;
+
+ try {
+ TransportFactory factory = findTransportFactory(transportKey);
+ result = factory.createTransport(remoteURI);
+ } catch (Exception ex) {
+ LOG.error("Failed to create Transport instance for {}, due to: {}", remoteURI.getScheme(), ex);
+ LOG.trace("Error: ", ex);
+ throw ex;
+ }
+
+ return result;
+ }
+
+ /**
+ * Searches for a TransportFactory by using the scheme from the given key.
+ *
+ * The search first checks the local cache of Transport factories before moving on
+ * to search in the class-path.
+ *
+ * @param transportKey
+ * The transport type name used to locate a TransportFactory.
+ *
+ * @return a Transport factory instance matching the transport key.
+ *
+ * @throws IOException if an error occurs while locating the factory.
+ */
+ public static TransportFactory findTransportFactory(String transportKey) throws IOException {
+ if (transportKey == null) {
+ throw new IOException("No Transport key specified");
+ }
+
+ TransportFactory factory = null;
+ try {
+ factory = TRANSPORT_FACTORY_FINDER.newInstance(transportKey);
+ } catch (Throwable e) {
+ throw new IOException("Transport type NOT recognized: [" + transportKey + "]", e);
+ }
+
+ return factory;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java
new file mode 100644
index 0000000..f0e34ca
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports;
+
+/**
+ * Encapsulates all the TCP Transport options in one configuration object.
+ */
+public class TransportOptions {
+
+ public static final int DEFAULT_SEND_BUFFER_SIZE = 64 * 1024;
+ public static final int DEFAULT_RECEIVE_BUFFER_SIZE = DEFAULT_SEND_BUFFER_SIZE;
+ public static final int DEFAULT_TRAFFIC_CLASS = 0;
+ public static final boolean DEFAULT_TCP_NO_DELAY = true;
+ public static final boolean DEFAULT_TCP_KEEP_ALIVE = false;
+ public static final int DEFAULT_SO_LINGER = Integer.MIN_VALUE;
+ public static final int DEFAULT_SO_TIMEOUT = -1;
+ public static final int DEFAULT_CONNECT_TIMEOUT = 60000;
+
+ public static final TransportOptions DEFAULT_OPTIONS = new TransportOptions();
+
+ private int sendBufferSize = DEFAULT_SEND_BUFFER_SIZE;
+ private int receiveBufferSize = DEFAULT_RECEIVE_BUFFER_SIZE;
+ private int trafficClass = DEFAULT_TRAFFIC_CLASS;
+ private int connectTimeout = DEFAULT_CONNECT_TIMEOUT;
+ private int soTimeout = DEFAULT_SO_TIMEOUT;
+ private int soLinger = DEFAULT_SO_LINGER;
+ private boolean tcpKeepAlive = DEFAULT_TCP_KEEP_ALIVE;
+ private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY;
+
+ /**
+ * @return the currently set send buffer size in bytes.
+ */
+ public int getSendBufferSize() {
+ return sendBufferSize;
+ }
+
+ /**
+ * Sets the send buffer size in bytes, the value must be greater than zero
+ * or an {@link IllegalArgumentException} will be thrown.
+ *
+ * @param sendBufferSize
+ * the new send buffer size for the TCP Transport.
+ *
+ * @throws IllegalArgumentException if the value given is not in the valid range.
+ */
+ public void setSendBufferSize(int sendBufferSize) {
+ if (sendBufferSize <= 0) {
+ throw new IllegalArgumentException("The send buffer size must be > 0");
+ }
+
+ this.sendBufferSize = sendBufferSize;
+ }
+
+ /**
+ * @return the currently configured receive buffer size in bytes.
+ */
+ public int getReceiveBufferSize() {
+ return receiveBufferSize;
+ }
+
+ /**
+ * Sets the receive buffer size in bytes, the value must be greater than zero
+ * or an {@link IllegalArgumentException} will be thrown.
+ *
+ * @param receiveBufferSize
+ * the new receive buffer size for the TCP Transport.
+ *
+ * @throws IllegalArgumentException if the value given is not in the valid range.
+ */
+ public void setReceiveBufferSize(int receiveBufferSize) {
+ if (receiveBufferSize <= 0) {
+ throw new IllegalArgumentException("The send buffer size must be > 0");
+ }
+
+ this.receiveBufferSize = receiveBufferSize;
+ }
+
+ /**
+ * @return the currently configured traffic class value.
+ */
+ public int getTrafficClass() {
+ return trafficClass;
+ }
+
+ /**
+ * Sets the traffic class value used by the TCP connection, valid
+ * range is between 0 and 255.
+ *
+ * @param trafficClass
+ * the new traffic class value.
+ *
+ * @throws IllegalArgumentException if the value given is not in the valid range.
+ */
+ public void setTrafficClass(int trafficClass) {
+ if (trafficClass < 0 || trafficClass > 255) {
+ throw new IllegalArgumentException("Traffic class must be in the range [0..255]");
+ }
+
+ this.trafficClass = trafficClass;
+ }
+
+ public int getSoTimeout() {
+ return soTimeout;
+ }
+
+ public void setSoTimeout(int soTimeout) {
+ this.soTimeout = soTimeout;
+ }
+
+ public boolean isTcpNoDelay() {
+ return tcpNoDelay;
+ }
+
+ public void setTcpNoDelay(boolean tcpNoDelay) {
+ this.tcpNoDelay = tcpNoDelay;
+ }
+
+ public int getSoLinger() {
+ return soLinger;
+ }
+
+ public void setSoLinger(int soLinger) {
+ this.soLinger = soLinger;
+ }
+
+ public boolean isTcpKeepAlive() {
+ return tcpKeepAlive;
+ }
+
+ public void setTcpKeepAlive(boolean keepAlive) {
+ this.tcpKeepAlive = keepAlive;
+ }
+
+ public int getConnectTimeout() {
+ return connectTimeout;
+ }
+
+ public void setConnectTimeout(int connectTimeout) {
+ this.connectTimeout = connectTimeout;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
new file mode 100644
index 0000000..3d4a928
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.FixedRecvByteBufAllocator;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.transports.Transport;
+import org.apache.qpid.jms.transports.TransportListener;
+import org.apache.qpid.jms.util.IOExceptionSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.vertx.java.core.net.impl.PartialPooledByteBufAllocator;
+
+/**
+ * TCP based transport that uses Netty as the underlying IO layer.
+ */
+public class NettyTcpTransport implements Transport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class);
+
+ private Bootstrap bootstrap;
+ private EventLoopGroup group;
+ private Channel channel;
+ private TransportListener listener;
+ private TransportOptions options;
+ private final URI remote;
+
+ private final AtomicBoolean connected = new AtomicBoolean();
+ private final AtomicBoolean closed = new AtomicBoolean();
+
+ /**
+ * Create a new transport instance
+ *
+ * @param remoteLocation
+ * the URI that defines the remote resource to connect to.
+ * @param options
+ * the transport options used to configure the socket connection.
+ */
+ public NettyTcpTransport(URI remoteLocation, TransportOptions options) {
+ this(null, remoteLocation, options);
+ }
+
+ /**
+ * Create a new transport instance
+ *
+ * @param listener
+ * the TransportListener that will receive events from this Transport.
+ * @param remoteLocation
+ * the URI that defines the remote resource to connect to.
+ * @param options
+ * the transport options used to configure the socket connection.
+ */
+ public NettyTcpTransport(TransportListener listener, URI remoteLocation, TransportOptions options) {
+ this.options = options;
+ this.listener = listener;
+ this.remote = remoteLocation;
+ }
+
+ @Override
+ public void connect() throws IOException {
+
+ if (listener == null) {
+ throw new IllegalStateException("A transport listener must be set before connection attempts.");
+ }
+
+ group = new NioEventLoopGroup();
+
+ bootstrap = new Bootstrap();
+ bootstrap.group(group);
+ bootstrap.channel(NioSocketChannel.class);
+ bootstrap.handler(new ChannelInitializer<Channel>() {
+
+ @Override
+ public void initChannel(Channel connectedChannel) throws Exception {
+ channel = connectedChannel;
+ channel.pipeline().addLast(new NettyTcpTransportHandler());
+ }
+ });
+
+ configureNetty(bootstrap, getTransportOptions());
+
+ ChannelFuture future = bootstrap.connect(remote.getHost(), remote.getPort());
+ future.awaitUninterruptibly();
+
+ if (future.isCancelled()) {
+ throw new IOException("Connection attempt was cancelled");
+ } else if (!future.isSuccess()) {
+ throw IOExceptionSupport.create(future.cause());
+ } else {
+ connected.set(true);
+ }
+ }
+
+ @Override
+ public boolean isConnected() {
+ return connected.get();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed.compareAndSet(false, true)) {
+ channel.close();
+ group.shutdownGracefully();
+ }
+ }
+
+ @Override
+ public void send(ByteBuffer output) throws IOException {
+ send(Unpooled.wrappedBuffer(output));
+ }
+
+ @Override
+ public void send(ByteBuf output) throws IOException {
+ channel.write(output);
+ channel.flush();
+ }
+
+ @Override
+ public TransportListener getTransportListener() {
+ return listener;
+ }
+
+ @Override
+ public void setTransportListener(TransportListener listener) {
+ this.listener = listener;
+ }
+
+ public TransportOptions getTransportOptions() {
+ if (options == null) {
+ options = TransportOptions.DEFAULT_OPTIONS;
+ }
+
+ return options;
+ }
+
+ //----- Internal implementation details ----------------------------------//
+
+ protected void configureNetty(Bootstrap bootstrap, TransportOptions options) {
+ bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
+ bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
+ bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
+ bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger());
+ bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
+
+ if (options.getSendBufferSize() != -1) {
+ bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize());
+ }
+
+ if (options.getReceiveBufferSize() != -1) {
+ bootstrap.option(ChannelOption.SO_RCVBUF, options.getSendBufferSize());
+ bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getSendBufferSize()));
+ }
+
+ if (options.getTrafficClass() != -1) {
+ bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass());
+ }
+ }
+
+ //----- Handle connection events -----------------------------------------//
+
+ private class NettyTcpTransportHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
+ @Override
+ public void channelActive(ChannelHandlerContext context) throws Exception {
+ LOG.info("Channel has become active! Channel is {}", context.channel());
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext context) throws Exception {
+ LOG.info("Channel has gone inactive! Channel is {}", context.channel());
+ if (!closed.get()) {
+ connected.set(false);
+ listener.onTransportClosed();
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
+ LOG.info("Exception on channel! Channel is {}", context.channel());
+ if (!closed.get()) {
+ connected.set(false);
+ listener.onTransportError(cause);
+ }
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
+ LOG.info("New data read: {} bytes incoming", buffer.readableBytes());
+ listener.onData(buffer);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportFactory.java
new file mode 100644
index 0000000..d51013c
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportFactory.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.transports.Transport;
+import org.apache.qpid.jms.transports.TransportFactory;
+import org.apache.qpid.jms.util.PropertyUtil;
+
+/**
+ * Factory for creating the Netty based TCP Transport.
+ */
+public class NettyTcpTransportFactory extends TransportFactory {
+
+ @Override
+ public Transport createTransport(URI remoteURI) throws Exception {
+
+ Map<String, String> map = PropertyUtil.parseQuery(remoteURI.getQuery());
+ Map<String, String> transportURIOptions = PropertyUtil.filterProperties(map, "transport.");
+
+ remoteURI = PropertyUtil.replaceQuery(remoteURI, map);
+
+ TransportOptions transportOptions = new TransportOptions();
+
+ if (!PropertyUtil.setProperties(transportOptions, transportURIOptions)) {
+ String msg = " Not all transport options could be set on the Transport." +
+ " Check the options are spelled correctly." +
+ " Given parameters=[" + transportURIOptions + "]." +
+ " This provider instance cannot be started.";
+ throw new IllegalArgumentException(msg);
+ }
+
+ Transport result = doCreateTransport(remoteURI, transportOptions);
+
+ return result;
+ }
+
+ protected NettyTcpTransport doCreateTransport(URI remoteURI, TransportOptions transportOptions) throws Exception {
+ return new NettyTcpTransport(remoteURI, transportOptions);
+ }
+
+ @Override
+ public String getName() {
+ return "TCP";
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org