You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2014/09/23 20:20:39 UTC

[15/27] Initial drop of donated AMQP Client Code.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedInputStream.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedInputStream.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedInputStream.java
new file mode 100644
index 0000000..c7ba887
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedInputStream.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * An optimized buffered input stream for Tcp
+ */
+public class TcpBufferedInputStream extends FilterInputStream {
+
+    private static final int DEFAULT_BUFFER_SIZE = 8192;
+    protected byte internalBuffer[];
+    protected int count;
+    protected int position;
+
+    public TcpBufferedInputStream(InputStream in) {
+        this(in, DEFAULT_BUFFER_SIZE);
+    }
+
+    public TcpBufferedInputStream(InputStream in, int size) {
+        super(in);
+        if (size <= 0) {
+            throw new IllegalArgumentException("Buffer size <= 0");
+        }
+        internalBuffer = new byte[size];
+    }
+
+    protected void fill() throws IOException {
+        byte[] buffer = internalBuffer;
+        count = 0;
+        position = 0;
+        int n = in.read(buffer, position, buffer.length - position);
+        if (n > 0) {
+            count = n + position;
+        }
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (position >= count) {
+            fill();
+            if (position >= count) {
+                return -1;
+            }
+        }
+        return internalBuffer[position++] & 0xff;
+    }
+
+    private int readStream(byte[] b, int off, int len) throws IOException {
+        int avail = count - position;
+        if (avail <= 0) {
+            if (len >= internalBuffer.length) {
+                return in.read(b, off, len);
+            }
+            fill();
+            avail = count - position;
+            if (avail <= 0) {
+                return -1;
+            }
+        }
+        int cnt = (avail < len) ? avail : len;
+        System.arraycopy(internalBuffer, position, b, off, cnt);
+        position += cnt;
+        return cnt;
+    }
+
+    @Override
+    public int read(byte b[], int off, int len) throws IOException {
+        if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
+            throw new IndexOutOfBoundsException();
+        } else if (len == 0) {
+            return 0;
+        }
+        int n = 0;
+        for (;;) {
+            int nread = readStream(b, off + n, len - n);
+            if (nread <= 0) {
+                return (n == 0) ? nread : n;
+            }
+            n += nread;
+            if (n >= len) {
+                return n;
+            }
+            // if not closed but no bytes available, return
+            InputStream input = in;
+            if (input != null && input.available() <= 0) {
+                return n;
+            }
+        }
+    }
+
+    @Override
+    public long skip(long n) throws IOException {
+        if (n <= 0) {
+            return 0;
+        }
+        long avail = count - position;
+        if (avail <= 0) {
+            return in.skip(n);
+        }
+        long skipped = (avail < n) ? avail : n;
+        position += skipped;
+        return skipped;
+    }
+
+    @Override
+    public int available() throws IOException {
+        return in.available() + (count - position);
+    }
+
+    @Override
+    public boolean markSupported() {
+        return false;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (in != null) {
+            in.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedOutputStream.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedOutputStream.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedOutputStream.java
new file mode 100644
index 0000000..82f8c41
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedOutputStream.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * An optimized buffered outputstream for Tcp
+ */
+public class TcpBufferedOutputStream extends FilterOutputStream {
+
+    private static final int BUFFER_SIZE = 8192;
+    private final byte[] buffer;
+    private final int bufferlen;
+    private int count;
+
+    /**
+     * Constructor
+     *
+     * @param out
+     */
+    public TcpBufferedOutputStream(OutputStream out) {
+        this(out, BUFFER_SIZE);
+    }
+
+    /**
+     * Creates a new buffered output stream to write data to the specified underlying output
+     * stream with the specified buffer size.
+     *
+     * @param out
+     *        the underlying output stream.
+     * @param size
+     *        the buffer size.
+     * @throws IllegalArgumentException
+     *         if size <= 0.
+     */
+    public TcpBufferedOutputStream(OutputStream out, int size) {
+        super(out);
+        if (size <= 0) {
+            throw new IllegalArgumentException("Buffer size <= 0");
+        }
+        buffer = new byte[size];
+        bufferlen = size;
+    }
+
+    /**
+     * write a byte on to the stream
+     *
+     * @param b
+     *        - byte to write
+     * @throws IOException
+     */
+    @Override
+    public void write(int b) throws IOException {
+        if ((bufferlen - count) < 1) {
+            flush();
+        }
+        buffer[count++] = (byte) b;
+    }
+
+    /**
+     * write a byte array to the stream
+     *
+     * @param b
+     *        the byte buffer
+     * @param off
+     *        the offset into the buffer
+     * @param len
+     *        the length of data to write
+     * @throws IOException
+     */
+    @Override
+    public void write(byte b[], int off, int len) throws IOException {
+        if (b != null) {
+            if ((bufferlen - count) < len) {
+                flush();
+            }
+            if (buffer.length >= len) {
+                System.arraycopy(b, off, buffer, count, len);
+                count += len;
+            } else {
+                out.write(b, off, len);
+            }
+        }
+    }
+
+    /**
+     * flush the data to the output stream This doesn't call flush on the underlying
+     * outputstream, because Tcp is particularly efficent at doing this itself ....
+     *
+     * @throws IOException
+     */
+    @Override
+    public void flush() throws IOException {
+        if (count > 0 && out != null) {
+            out.write(buffer, 0, count);
+            count = 0;
+        }
+    }
+
+    /**
+     * close this stream
+     *
+     * @throws IOException
+     */
+    @Override
+    public void close() throws IOException {
+        super.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransport.java
new file mode 100644
index 0000000..4a58c8e
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransport.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.qpid.jms.util.IOExceptionSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.vertx.java.core.AsyncResult;
+import org.vertx.java.core.AsyncResultHandler;
+import org.vertx.java.core.Handler;
+import org.vertx.java.core.Vertx;
+import org.vertx.java.core.VertxFactory;
+import org.vertx.java.core.buffer.Buffer;
+import org.vertx.java.core.net.NetClient;
+import org.vertx.java.core.net.NetSocket;
+
+/**
+ * Vertex based TCP transport for raw data packets.
+ */
+public class TcpTransport implements Transport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TcpTransport.class);
+
+    private final Vertx vertx = VertxFactory.newVertx();
+    private final NetClient client = vertx.createNetClient();
+    private final URI remoteLocation;
+    private final AtomicBoolean connected = new AtomicBoolean();
+    private final AtomicBoolean closed = new AtomicBoolean();
+    private final AtomicReference<Throwable> connectionError = new AtomicReference<Throwable>();
+
+    private NetSocket socket;
+
+    private TransportListener listener;
+    private int socketBufferSize = 64 * 1024;
+    private int soTimeout = -1;
+    private int connectTimeout = -1;
+    private int soLinger = Integer.MIN_VALUE;
+    private boolean keepAlive;
+    private boolean tcpNoDelay = true;
+
+    /**
+     * Create a new instance of the transport.
+     *
+     * @param listener
+     *        The TransportListener that will receive data from this Transport instance.
+     * @param remoteLocation
+     *        The remote location where this transport should connection to.
+     */
+    public TcpTransport(TransportListener listener, URI remoteLocation) {
+        this.listener = listener;
+        this.remoteLocation = remoteLocation;
+    }
+
+    @Override
+    public void connect() throws IOException {
+        final CountDownLatch connectLatch = new CountDownLatch(1);
+
+        if (listener == null) {
+            throw new IllegalStateException("A transport listener must be set before connection attempts.");
+        }
+
+        configureNetClient(client);
+
+        try {
+            client.connect(remoteLocation.getPort(), remoteLocation.getHost(), new AsyncResultHandler<NetSocket>() {
+                @Override
+                public void handle(AsyncResult<NetSocket> asyncResult) {
+                    if (asyncResult.succeeded()) {
+                        socket = asyncResult.result();
+                        LOG.info("We have connected! Socket is {}", socket);
+
+                        connected.set(true);
+                        connectLatch.countDown();
+
+                        socket.dataHandler(new Handler<Buffer>() {
+                            @Override
+                            public void handle(Buffer event) {
+                                listener.onData(event);
+                            }
+                        });
+
+                        socket.closeHandler(new Handler<Void>() {
+                            @Override
+                            public void handle(Void event) {
+                                connected.set(false);
+                                listener.onTransportClosed();
+                            }
+                        });
+
+                        socket.exceptionHandler(new Handler<Throwable>() {
+                            @Override
+                            public void handle(Throwable event) {
+                                connected.set(false);
+                                listener.onTransportError(event);
+                            }
+                        });
+
+                    } else {
+                        connected.set(false);
+                        connectionError.set(asyncResult.cause());
+                        connectLatch.countDown();
+                    }
+                }
+            });
+        } catch (Throwable reason) {
+            LOG.info("Failed to connect to target Broker: {}", reason);
+            throw IOExceptionSupport.create(reason);
+        }
+
+        try {
+            connectLatch.await();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+
+        if (connectionError.get() != null) {
+            throw IOExceptionSupport.create(connectionError.get());
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (closed.compareAndSet(false, true)) {
+            if (connected.get()) {
+                socket.close();
+                connected.set(false);
+            }
+
+            vertx.stop();
+        }
+    }
+
+    @Override
+    public void send(ByteBuffer output) throws IOException {
+        checkConnected();
+        int length = output.remaining();
+        if (length == 0) {
+            return;
+        }
+
+        byte[] copy = new byte[length];
+        output.get(copy);
+        Buffer sendBuffer = new Buffer(copy);
+
+        vertx.eventBus().send(socket.writeHandlerID(), sendBuffer);
+    }
+
+    @Override
+    public void send(org.fusesource.hawtbuf.Buffer output) throws IOException {
+        checkConnected();
+        int length = output.length();
+        if (length == 0) {
+            return;
+        }
+
+        org.fusesource.hawtbuf.Buffer clone = output.deepCopy();
+        Buffer sendBuffer = new Buffer(clone.data);
+        vertx.eventBus().send(socket.writeHandlerID(), sendBuffer);
+    }
+
+    /**
+     * Allows a subclass to configure the NetClient beyond what this transport might do.
+     *
+     * @throws IOException if an error occurs.
+     */
+    protected void configureNetClient(NetClient client) throws IOException {
+        client.setSendBufferSize(getSocketBufferSize());
+        client.setReceiveBufferSize(getSocketBufferSize());
+        client.setSoLinger(soLinger);
+        client.setTCPKeepAlive(keepAlive);
+        client.setTCPNoDelay(tcpNoDelay);
+        if (connectTimeout >= 0) {
+            client.setConnectTimeout(connectTimeout);
+        }
+    }
+
+    @Override
+    public boolean isConnected() {
+        return this.connected.get();
+    }
+
+    private void checkConnected() throws IOException {
+        if (!connected.get()) {
+            throw new IOException("Cannot send to a non-connected transport.");
+        }
+    }
+
+    @Override
+    public TransportListener getTransportListener() {
+        return this.listener;
+    }
+
+    @Override
+    public void setTransportListener(TransportListener listener) {
+        if (listener == null) {
+            throw new IllegalArgumentException("Listener cannot be set to null");
+        }
+
+        this.listener = listener;
+    }
+
+    public int getSocketBufferSize() {
+        return socketBufferSize;
+    }
+
+    public void setSocketBufferSize(int socketBufferSize) {
+        this.socketBufferSize = socketBufferSize;
+    }
+
+    public int getSoTimeout() {
+        return soTimeout;
+    }
+
+    public void setSoTimeout(int soTimeout) {
+        this.soTimeout = soTimeout;
+    }
+
+    public boolean isTcpNoDelay() {
+        return tcpNoDelay;
+    }
+
+    public void setTcpNoDelay(boolean tcpNoDelay) {
+        this.tcpNoDelay = tcpNoDelay;
+    }
+
+    public int getSoLinger() {
+        return soLinger;
+    }
+
+    public void setSoLinger(int soLinger) {
+        this.soLinger = soLinger;
+    }
+
+    public boolean isKeepAlive() {
+        return keepAlive;
+    }
+
+    public void setKeepAlive(boolean keepAlive) {
+        this.keepAlive = keepAlive;
+    }
+
+    public int getConnectTimeout() {
+        return connectTimeout;
+    }
+
+    public void setConnectTimeout(int connectTimeout) {
+        this.connectTimeout = connectTimeout;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java
new file mode 100644
index 0000000..4cced80
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.fusesource.hawtbuf.Buffer;
+
+/**
+ * Base class for all QpidJMS Transport instances.
+ */
+public interface Transport {
+
+    /**
+     * Performs the protocol connect operation for the implemented Transport type
+     * such as a TCP socket connection etc.
+     *
+     * @throws IOException if an error occurs while attempting the connect.
+     */
+    void connect() throws IOException;
+
+    /**
+     * @return true if transport is connected or false if the connection is down.
+     */
+    boolean isConnected();
+
+    /**
+     * Close the Transport, no additional send operations are accepted.
+     *
+     * @throws IOException if an error occurs while closing the connection.
+     */
+    void close() throws IOException;
+
+    /**
+     * Sends a chunk of data over the Transport connection.
+     *
+     * @param output
+     *        The buffer of data that is to be transmitted.
+     *
+     * @throws IOException if an error occurs during the send operation.
+     */
+    void send(ByteBuffer output) throws IOException;
+
+    /**
+     * Sends a chunk of data over the Transport connection.
+     *
+     * @param output
+     *        The buffer of data that is to be transmitted.
+     *
+     * @throws IOException if an error occurs during the send operation.
+     */
+    void send(Buffer output) throws IOException;
+
+    /**
+     * Gets the currently set TransportListener instance
+     *
+     * @returns the current TransportListener or null if none set.
+     */
+    TransportListener getTransportListener();
+
+    /**
+     * Sets the Transport Listener instance that will be notified of incoming data or
+     * error events.
+     *
+     * @param listener
+     *        The new TransportListener instance to use (cannot be null).
+     *
+     * @throws IllegalArgumentException if the given listener is null.
+     */
+    void setTransportListener(TransportListener listener);
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportListener.java
new file mode 100644
index 0000000..f244347
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportListener.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports;
+
+import org.vertx.java.core.buffer.Buffer;
+
+/**
+ * Listener interface that should be implemented by users of the various
+ * QpidJMS Transport classes.
+ */
+public interface TransportListener {
+
+    /**
+     * Called when new incoming data has become available.
+     *
+     * @param incoming
+     *        the next incoming packet of data.
+     */
+    void onData(Buffer incoming);
+
+    /**
+     * Called if the connection state becomes closed.
+     */
+    void onTransportClosed();
+
+    /**
+     * Called when an error occurs during normal Transport operations.
+     *
+     * @param cause
+     *        the error that triggered this event.
+     */
+    void onTransportError(Throwable cause);
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/AbstractMessageQueue.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/AbstractMessageQueue.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/AbstractMessageQueue.java
new file mode 100644
index 0000000..f20c21f
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/AbstractMessageQueue.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.util;
+
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+
+/**
+ * Abstract Message Queue class used to implement the common functions of a Message Queue
+ * instance.
+ */
+public abstract class AbstractMessageQueue implements MessageQueue {
+
+    protected boolean closed;
+    protected boolean running;
+    protected Object lock = new Object();
+
+    @Override
+    public JmsInboundMessageDispatch peek() {
+        synchronized (lock) {
+            return peekFirst();
+        }
+    }
+
+    @Override
+    public JmsInboundMessageDispatch dequeue(long timeout) throws InterruptedException {
+        synchronized (lock) {
+            // Wait until the consumer is ready to deliver messages.
+            while (timeout != 0 && !closed && (isEmpty() || !running)) {
+                if (timeout == -1) {
+                    lock.wait();
+                } else {
+                    lock.wait(timeout);
+                    break;
+                }
+            }
+
+            if (closed || !running || isEmpty()) {
+                return null;
+            }
+
+            return removeFirst();
+        }
+    }
+
+    @Override
+    public JmsInboundMessageDispatch dequeueNoWait() {
+        synchronized (lock) {
+            if (closed || !running || isEmpty()) {
+                return null;
+            }
+            return removeFirst();
+        }
+    }
+
+    @Override
+    public void start() {
+        synchronized (lock) {
+            running = true;
+            lock.notifyAll();
+        }
+    }
+
+    @Override
+    public void stop() {
+        synchronized (lock) {
+            running = false;
+            lock.notifyAll();
+        }
+    }
+
+    @Override
+    public boolean isRunning() {
+        return running;
+    }
+
+    @Override
+    public void close() {
+        synchronized (lock) {
+            if (!closed) {
+                running = false;
+                closed = true;
+            }
+            lock.notifyAll();
+        }
+    }
+
+    @Override
+    public boolean isClosed() {
+        return closed;
+    }
+
+    @Override
+    public Object getLock() {
+        return lock;
+    }
+
+    /**
+     * Removes and returns the first entry in the implementation queue.  This method
+     * is always called under lock and does not need to protect itself or check running
+     * state etc.
+     *
+     * @return the first message queued in the implemented queue.
+     */
+    protected abstract JmsInboundMessageDispatch removeFirst();
+
+    /**
+     * Returns but does not remove the first entry in the implementation queue.  This method
+     * is always called under lock and does not need to protect itself or check running
+     * state etc.
+     *
+     * @return the first message queued in the implemented queue.
+     */
+    protected abstract JmsInboundMessageDispatch peekFirst();
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/ClassLoadingAwareObjectInputStream.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/ClassLoadingAwareObjectInputStream.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/ClassLoadingAwareObjectInputStream.java
new file mode 100644
index 0000000..8632ee2
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/ClassLoadingAwareObjectInputStream.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectStreamClass;
+import java.lang.reflect.Proxy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ClassLoadingAwareObjectInputStream extends ObjectInputStream {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ClassLoadingAwareObjectInputStream.class);
+    private static final ClassLoader FALLBACK_CLASS_LOADER = ClassLoadingAwareObjectInputStream.class.getClassLoader();
+
+    private final ClassLoader inLoader;
+
+    public ClassLoadingAwareObjectInputStream(InputStream in) throws IOException {
+        super(in);
+        inLoader = in.getClass().getClassLoader();
+    }
+
+    @Override
+    protected Class<?> resolveClass(ObjectStreamClass classDesc) throws IOException, ClassNotFoundException {
+        ClassLoader cl = Thread.currentThread().getContextClassLoader();
+        return load(classDesc.getName(), cl, inLoader);
+    }
+
+    @Override
+    protected Class<?> resolveProxyClass(String[] interfaces) throws IOException, ClassNotFoundException {
+        ClassLoader cl = Thread.currentThread().getContextClassLoader();
+        Class<?>[] cinterfaces = new Class[interfaces.length];
+        for (int i = 0; i < interfaces.length; i++) {
+            cinterfaces[i] = load(interfaces[i], cl);
+        }
+
+        try {
+            return Proxy.getProxyClass(cl, cinterfaces);
+        } catch (IllegalArgumentException e) {
+            try {
+                return Proxy.getProxyClass(inLoader, cinterfaces);
+            } catch (IllegalArgumentException e1) {
+                // ignore
+            }
+            try {
+                return Proxy.getProxyClass(FALLBACK_CLASS_LOADER, cinterfaces);
+            } catch (IllegalArgumentException e2) {
+                // ignore
+            }
+
+            throw new ClassNotFoundException(null, e);
+        }
+    }
+
+    private Class<?> load(String className, ClassLoader... cl) throws ClassNotFoundException {
+        // check for simple types first
+        final Class<?> clazz = loadSimpleType(className);
+        if (clazz != null) {
+            LOG.trace("Loaded class: {} as simple type -> ", className, clazz);
+            return clazz;
+        }
+
+        // try the different class loaders
+        for (ClassLoader loader : cl) {
+            LOG.trace("Attempting to load class: {} using classloader: {}", className, cl);
+            try {
+                Class<?> answer = Class.forName(className, false, loader);
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Loaded class: {} using classloader: {} -> ", new Object[] { className, cl, answer });
+                }
+                return answer;
+            } catch (ClassNotFoundException e) {
+                LOG.trace("Class not found: {} using classloader: {}", className, cl);
+                // ignore
+            }
+        }
+
+        // and then the fallback class loader
+        return Class.forName(className, false, FALLBACK_CLASS_LOADER);
+    }
+
+    /**
+     * Load a simple type
+     *
+     * @param name
+     *        the name of the class to load
+     * @return the class or <tt>null</tt> if it could not be loaded
+     */
+    public static Class<?> loadSimpleType(String name) {
+        if ("java.lang.byte[]".equals(name) || "byte[]".equals(name)) {
+            return byte[].class;
+        } else if ("java.lang.Byte[]".equals(name) || "Byte[]".equals(name)) {
+            return Byte[].class;
+        } else if ("java.lang.Object[]".equals(name) || "Object[]".equals(name)) {
+            return Object[].class;
+        } else if ("java.lang.String[]".equals(name) || "String[]".equals(name)) {
+            return String[].class;
+            // and these is common as well
+        } else if ("java.lang.String".equals(name) || "String".equals(name)) {
+            return String.class;
+        } else if ("java.lang.Boolean".equals(name) || "Boolean".equals(name)) {
+            return Boolean.class;
+        } else if ("boolean".equals(name)) {
+            return boolean.class;
+        } else if ("java.lang.Integer".equals(name) || "Integer".equals(name)) {
+            return Integer.class;
+        } else if ("int".equals(name)) {
+            return int.class;
+        } else if ("java.lang.Long".equals(name) || "Long".equals(name)) {
+            return Long.class;
+        } else if ("long".equals(name)) {
+            return long.class;
+        } else if ("java.lang.Short".equals(name) || "Short".equals(name)) {
+            return Short.class;
+        } else if ("short".equals(name)) {
+            return short.class;
+        } else if ("java.lang.Byte".equals(name) || "Byte".equals(name)) {
+            return Byte.class;
+        } else if ("byte".equals(name)) {
+            return byte.class;
+        } else if ("java.lang.Float".equals(name) || "Float".equals(name)) {
+            return Float.class;
+        } else if ("float".equals(name)) {
+            return float.class;
+        } else if ("java.lang.Double".equals(name) || "Double".equals(name)) {
+            return Double.class;
+        } else if ("double".equals(name)) {
+            return double.class;
+        }
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FactoryFinder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FactoryFinder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FactoryFinder.java
new file mode 100644
index 0000000..b8db705
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FactoryFinder.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.util;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A Factory finding helper class used to locate objects that serve as Factories for
+ * other Object types.  The search an instantiate mechanism is configurable so that
+ * in a non-stand-alone environment such as OSGI the finder and be configured to work.
+ */
+public class FactoryFinder<T extends Object> {
+
+    /**
+     * The strategy that the FactoryFinder uses to find load and instantiate Objects can be
+     * changed out by calling the
+     * {@link org.apache.qpid.jms.util.FactoryFinder#setObjectFactory(org.apache.qpid.jms.util.FactoryFinder.ObjectFactory)}
+     * method with a custom implementation of ObjectFactory.
+     *
+     * The default ObjectFactory is typically changed out when running in a specialized
+     * container environment where service discovery needs to be done via the container system.
+     * For example, in an OSGi scenario.
+     */
+    public interface ObjectFactory {
+
+        /**
+         * Creates the requested factory instance.
+         *
+         * @param path
+         *        the full service path
+         *
+         * @return instance of the factory object being searched for.
+         *
+         * @throws IllegalAccessException if an error occurs while accessing the search path.
+         * @throws InstantiationException if the factory object fails on create.
+         * @throws IOException if the search encounter an IO error.
+         * @throws ClassNotFoundException if the class that is to be loaded cannot be found.
+         */
+        public Object create(String path) throws IllegalAccessException, InstantiationException, IOException, ClassNotFoundException;
+
+    }
+
+    private static ObjectFactory objectFactory = new StandaloneObjectFactory();
+
+    private final ConcurrentHashMap<String, T> cachedFactories = new ConcurrentHashMap<String, T>();
+    private final String path;
+    private final Class<T> factoryType;
+
+    /**
+     * Creates a new instance of the FactoryFinder using the given search path.
+     *
+     * @param path
+     *        The path to use when searching for the factory definitions.
+     */
+    public FactoryFinder(Class<T> factoryType, String path) {
+        this.path = path;
+        this.factoryType = factoryType;
+    }
+
+    /**
+     * @return the currently configured ObjectFactory instance used to locate the Factory objects.
+     */
+    public static ObjectFactory getObjectFactory() {
+        return objectFactory;
+    }
+
+    /**
+     * Sets the ObjectFactory instance to use when searching for the Factory class.  This allows
+     * the default instance to be overridden in an environment where the basic version will not
+     * work.
+     *
+     * @param objectFactory
+     *        the new object factory to use when searching for a Factory instance.
+     */
+    public static void setObjectFactory(ObjectFactory objectFactory) {
+        FactoryFinder.objectFactory = objectFactory;
+    }
+
+    /**
+     * Creates a new instance of the given key.  The method first checks the cache of previously
+     * found factory instances for one that matches the key.  If no cached version exists then
+     * the factory will be searched for using the configured ObjectFactory instance.
+     *
+     * @param key
+     *        is the key to add to the path to find a text file containing the factory name
+     *
+     * @return a newly created instance
+     *
+     * @throws IllegalAccessException if an error occurs while accessing the search path.
+     * @throws InstantiationException if the factory object fails on create.
+     * @throws IOException if the search encounter an IO error.
+     * @throws ClassNotFoundException if the class that is to be loaded cannot be found.
+     * @throws ClassCastException if the found object is not assignable to the request factory type.
+     */
+    public T newInstance(String key) throws IllegalAccessException, InstantiationException, IOException, ClassNotFoundException, ClassCastException {
+        T factory = cachedFactories.get(key);
+        if (factory == null) {
+
+            Object found = objectFactory.create(path + key);
+            if (found != null && factoryType.isInstance(found)) {
+                factory = factoryType.cast(found);
+                cachedFactories.put(key, factory);
+            } else {
+                throw new ClassCastException("Cannot cast " + found.getClass().getName() +
+                    " to " + factoryType.getName());
+            }
+        }
+
+        return factory;
+    }
+
+    /**
+     * Allow registration of a Provider factory without wiring via META-INF classes
+     *
+     * @param scheme
+     *        The URI scheme value that names the target Provider instance.
+     * @param factory
+     *        The factory to register in this finder.
+     */
+    public void registerProviderFactory(String scheme, T factory) {
+        cachedFactories.put(scheme, factory);
+    }
+
+    /**
+     * The default implementation of Object factory which works well in stand-alone applications.
+     */
+    @SuppressWarnings("rawtypes")
+    protected static class StandaloneObjectFactory implements ObjectFactory {
+        final ConcurrentHashMap<String, Class> classMap = new ConcurrentHashMap<String, Class>();
+
+        @Override
+        public Object create(final String path) throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException {
+            Class clazz = classMap.get(path);
+            if (clazz == null) {
+                clazz = loadClass(loadProperties(path));
+                classMap.put(path, clazz);
+            }
+            return clazz.newInstance();
+        }
+
+        static public Class loadClass(Properties properties) throws ClassNotFoundException, IOException {
+
+            String className = properties.getProperty("class");
+            if (className == null) {
+                throw new IOException("Expected property is missing: class");
+            }
+            Class clazz = null;
+            ClassLoader loader = Thread.currentThread().getContextClassLoader();
+            if (loader != null) {
+                try {
+                    clazz = loader.loadClass(className);
+                } catch (ClassNotFoundException e) {
+                    // ignore
+                }
+            }
+            if (clazz == null) {
+                clazz = FactoryFinder.class.getClassLoader().loadClass(className);
+            }
+
+            return clazz;
+        }
+
+        static public Properties loadProperties(String uri) throws IOException {
+            // lets try the thread context class loader first
+            ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+            if (classLoader == null) {
+                classLoader = StandaloneObjectFactory.class.getClassLoader();
+            }
+            InputStream in = classLoader.getResourceAsStream(uri);
+            if (in == null) {
+                in = FactoryFinder.class.getClassLoader().getResourceAsStream(uri);
+                if (in == null) {
+                    throw new IOException("Could not find factory class for resource: " + uri);
+                }
+            }
+
+            // lets load the file
+            BufferedInputStream reader = null;
+            try {
+                reader = new BufferedInputStream(in);
+                Properties properties = new Properties();
+                properties.load(reader);
+                return properties;
+            } finally {
+                try {
+                    reader.close();
+                } catch (Exception e) {
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java
new file mode 100644
index 0000000..b50e480
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.util;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+
+/**
+ * Simple first in / first out Message Queue.
+ */
+public final class FifoMessageQueue extends AbstractMessageQueue {
+
+    protected final LinkedList<JmsInboundMessageDispatch> list = new LinkedList<JmsInboundMessageDispatch>();
+
+    @Override
+    public void enqueueFirst(JmsInboundMessageDispatch envelope) {
+        synchronized (lock) {
+            list.addFirst(envelope);
+            lock.notify();
+        }
+    }
+
+    @Override
+    public void enqueue(JmsInboundMessageDispatch envelope) {
+        synchronized (lock) {
+            list.addLast(envelope);
+            lock.notify();
+        }
+    }
+
+    @Override
+    public boolean isEmpty() {
+        synchronized (lock) {
+            return list.isEmpty();
+        }
+    }
+
+    @Override
+    public int size() {
+        synchronized (lock) {
+            return list.size();
+        }
+    }
+
+    @Override
+    public void clear() {
+        synchronized (lock) {
+            list.clear();
+        }
+    }
+
+    @Override
+    public List<JmsInboundMessageDispatch> removeAll() {
+        synchronized (lock) {
+            ArrayList<JmsInboundMessageDispatch> rc = new ArrayList<JmsInboundMessageDispatch>(list.size());
+            for (JmsInboundMessageDispatch entry : list) {
+                rc.add(entry);
+            }
+            list.clear();
+            return rc;
+        }
+    }
+
+    @Override
+    public String toString() {
+        synchronized (lock) {
+            return list.toString();
+        }
+    }
+
+    @Override
+    protected JmsInboundMessageDispatch removeFirst() {
+        return list.removeFirst();
+    }
+
+    @Override
+    protected JmsInboundMessageDispatch peekFirst() {
+        return list.peekFirst();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/IOExceptionSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/IOExceptionSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/IOExceptionSupport.java
new file mode 100644
index 0000000..6caf15c
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/IOExceptionSupport.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.util;
+
+import java.io.IOException;
+
+/**
+ * Used to make throwing IOException instances easier.
+ */
+public class IOExceptionSupport {
+
+    /**
+     * Checks the given cause to determine if it's already an IOException type and
+     * if not creates a new IOException to wrap it.
+     *
+     * @param cause
+     *        The initiating exception that should be cast or wrapped.
+     *
+     * @return an IOException instance.
+     */
+    public static IOException create(Throwable cause) {
+        if (cause instanceof IOException) {
+            return (IOException) cause;
+        }
+
+        String message = cause.getMessage();
+        if (message == null || message.length() == 0) {
+            message = cause.toString();
+        }
+
+        return new IOException(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/IdGenerator.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/IdGenerator.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/IdGenerator.java
new file mode 100644
index 0000000..f07b5b4
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/IdGenerator.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.util;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generator for Globally unique Strings.
+ */
+public class IdGenerator {
+
+    private static final Logger LOG = LoggerFactory.getLogger(IdGenerator.class);
+    private static final String UNIQUE_STUB;
+    private static int instanceCount;
+    private static String hostName;
+    private String seed;
+    private final AtomicLong sequence = new AtomicLong(1);
+    private int length;
+    public static final String PROPERTY_IDGENERATOR_PORT = "activemq.idgenerator.port";
+
+    static {
+        String stub = "";
+        boolean canAccessSystemProps = true;
+        try {
+            SecurityManager sm = System.getSecurityManager();
+            if (sm != null) {
+                sm.checkPropertiesAccess();
+            }
+        } catch (SecurityException se) {
+            canAccessSystemProps = false;
+        }
+
+        if (canAccessSystemProps) {
+            int idGeneratorPort = 0;
+            ServerSocket ss = null;
+            try {
+                idGeneratorPort = Integer.parseInt(System.getProperty(PROPERTY_IDGENERATOR_PORT, "0"));
+                LOG.trace("Using port {}", idGeneratorPort);
+                hostName = InetAddressUtil.getLocalHostName();
+                ss = new ServerSocket(idGeneratorPort);
+                stub = "-" + ss.getLocalPort() + "-" + System.currentTimeMillis() + "-";
+                Thread.sleep(100);
+            } catch (Exception e) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("could not generate unique stub by using DNS and binding to local port", e);
+                } else {
+                    LOG.warn("could not generate unique stub by using DNS and binding to local port: {} {}", e.getClass().getCanonicalName(), e.getMessage());
+                }
+
+                // Restore interrupted state so higher level code can deal with it.
+                if (e instanceof InterruptedException) {
+                    Thread.currentThread().interrupt();
+                }
+            } finally {
+                if (ss != null) {
+                    try {
+                        ss.close();
+                    } catch (IOException ioe) {
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace("Closing the server socket failed", ioe);
+                        } else {
+                            LOG.warn("Closing the server socket failed" + " due " + ioe.getMessage());
+                        }
+                    }
+                }
+            }
+        }
+
+        if (hostName == null) {
+            hostName = "localhost";
+        }
+        hostName = sanitizeHostName(hostName);
+
+        if (stub.length() == 0) {
+            stub = "-1-" + System.currentTimeMillis() + "-";
+        }
+        UNIQUE_STUB = stub;
+    }
+
+    /**
+     * Construct an IdGenerator
+     */
+    public IdGenerator(String prefix) {
+        synchronized (UNIQUE_STUB) {
+            this.seed = prefix + UNIQUE_STUB + (instanceCount++) + ":";
+            this.length = this.seed.length() + ("" + Long.MAX_VALUE).length();
+        }
+    }
+
+    public IdGenerator() {
+        this("ID:" + hostName);
+    }
+
+    /**
+     * As we have to find the host name as a side-affect of generating a unique stub, we allow
+     * it's easy retrieval here
+     *
+     * @return the local host name
+     */
+    public static String getHostName() {
+        return hostName;
+    }
+
+    /**
+     * Generate a unique id
+     *
+     * @return a unique id
+     */
+    public synchronized String generateId() {
+        StringBuilder sb = new StringBuilder(length);
+        sb.append(seed);
+        sb.append(sequence.getAndIncrement());
+        return sb.toString();
+    }
+
+    public static String sanitizeHostName(String hostName) {
+        boolean changed = false;
+
+        StringBuilder sb = new StringBuilder();
+        for (char ch : hostName.toCharArray()) {
+            // only include ASCII chars
+            if (ch < 127) {
+                sb.append(ch);
+            } else {
+                changed = true;
+            }
+        }
+
+        if (changed) {
+            String newHost = sb.toString();
+            LOG.info("Sanitized hostname from: {} to: {}", hostName, newHost);
+            return newHost;
+        } else {
+            return hostName;
+        }
+    }
+
+    /**
+     * Generate a unique ID - that is friendly for a URL or file system
+     *
+     * @return a unique id
+     */
+    public String generateSanitizedId() {
+        String result = generateId();
+        result = result.replace(':', '-');
+        result = result.replace('_', '-');
+        result = result.replace('.', '-');
+        return result;
+    }
+
+    /**
+     * From a generated id - return the seed (i.e. minus the count)
+     *
+     * @param id
+     *        the generated identifier
+     * @return the seed
+     */
+    public static String getSeedFromId(String id) {
+        String result = id;
+        if (id != null) {
+            int index = id.lastIndexOf(':');
+            if (index > 0 && (index + 1) < id.length()) {
+                result = id.substring(0, index);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * From a generated id - return the generator count
+     *
+     * @param id
+     * @return the count
+     */
+    public static long getSequenceFromId(String id) {
+        long result = -1;
+        if (id != null) {
+            int index = id.lastIndexOf(':');
+
+            if (index > 0 && (index + 1) < id.length()) {
+                String numStr = id.substring(index + 1, id.length());
+                result = Long.parseLong(numStr);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Does a proper compare on the Id's
+     *
+     * @param id1
+     * @param id2
+     * @return 0 if equal else a positive if id1 is > id2 ...
+     */
+    public static int compare(String id1, String id2) {
+        int result = -1;
+        String seed1 = IdGenerator.getSeedFromId(id1);
+        String seed2 = IdGenerator.getSeedFromId(id2);
+        if (seed1 != null && seed2 != null) {
+            result = seed1.compareTo(seed2);
+            if (result == 0) {
+                long count1 = IdGenerator.getSequenceFromId(id1);
+                long count2 = IdGenerator.getSequenceFromId(id2);
+                result = (int) (count1 - count2);
+            }
+        }
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/InetAddressUtil.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/InetAddressUtil.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/InetAddressUtil.java
new file mode 100644
index 0000000..9513892
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/InetAddressUtil.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.util;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+public class InetAddressUtil {
+
+    /**
+     * When using the {@link java.net.InetAddress#getHostName()} method in an
+     * environment where neither a proper DNS lookup nor an <tt>/etc/hosts</tt>
+     * entry exists for a given host, the following exception will be thrown:
+     * <code>
+     * java.net.UnknownHostException: &lt;hostname&gt;: &lt;hostname&gt;
+     *  at java.net.InetAddress.getLocalHost(InetAddress.java:1425)
+     *   ...
+     * </code>
+     * Instead of just throwing an UnknownHostException and giving up, this
+     * method grabs a suitable hostname from the exception and prevents the
+     * exception from being thrown. If a suitable hostname cannot be acquired
+     * from the exception, only then is the <tt>UnknownHostException</tt> thrown.
+     *
+     * @return The hostname
+     * @throws UnknownHostException
+     * @see {@link java.net.InetAddress#getLocalHost()}
+     * @see {@link java.net.InetAddress#getHostName()}
+     */
+    public static String getLocalHostName() throws UnknownHostException {
+        try {
+            return (InetAddress.getLocalHost()).getHostName();
+        } catch (UnknownHostException uhe) {
+            String host = uhe.getMessage(); // host = "hostname: hostname"
+            if (host != null) {
+                int colon = host.indexOf(':');
+                if (colon > 0) {
+                    return host.substring(0, colon);
+                }
+            }
+            throw uhe;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/MessageQueue.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/MessageQueue.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/MessageQueue.java
new file mode 100644
index 0000000..a44faf6
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/MessageQueue.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.util;
+
+import java.util.List;
+
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+
+/**
+ * Queue based storage interface for inbound Messages.
+ */
+public interface MessageQueue {
+
+    /**
+     * Adds the given message envelope to the end of the Message queue.
+     *
+     * @param envelope
+     *        The in-bound Message envelope to enqueue.
+     */
+    void enqueue(JmsInboundMessageDispatch envelope);
+
+    /**
+     * Adds the given message envelope to the front of the Message queue.
+     *
+     * @param envelope
+     *        The in-bound Message envelope to enqueue.
+     */
+    void enqueueFirst(JmsInboundMessageDispatch envelope);
+
+    /**
+     * @return true if there are no messages in the queue.
+     */
+    boolean isEmpty();
+
+    /**
+     * Return but do not remove the first element in the Message queue.
+     *
+     * @return the first element in the Queue or null if empty.
+     */
+    JmsInboundMessageDispatch peek();
+
+    /**
+     * Used to get an enqueued message. The amount of time this method blocks is
+     * based on the timeout value. - if timeout==-1 then it blocks until a
+     * message is received. - if timeout==0 then it it tries to not block at
+     * all, it returns a message if it is available - if timeout>0 then it
+     * blocks up to timeout amount of time. Expired messages will consumed by
+     * this method.
+     *
+     * @return null if we timeout or if the consumer is closed.
+     *
+     * @throws InterruptedException if the wait is interrupted.
+     */
+    JmsInboundMessageDispatch dequeue(long timeout) throws InterruptedException;
+
+    /**
+     * Used to get an enqueued Message if on exists, otherwise returns null.
+     *
+     * @return the next Message in the Queue if one exists, otherwise null.
+     */
+    JmsInboundMessageDispatch dequeueNoWait();
+
+    /**
+     * Starts the Message Queue.  An non-started Queue will always return null for
+     * any of the Queue accessor methods.
+     */
+    void start();
+
+    /**
+     * Stops the Message Queue.  Messages cannot be read from the Queue when it is in
+     * the stopped state.
+     */
+    void stop();
+
+    /**
+     * @return true if the Queue is not in the stopped or closed state.
+     */
+    boolean isRunning();
+
+    /**
+     * Closes the Message Queue.  No messages can be added or removed from the Queue
+     * once it has entered the closed state.
+     */
+    void close();
+
+    /**
+     * @return true if the Queue has been closed.
+     */
+    boolean isClosed();
+
+    /**
+     * Returns the number of Messages currently in the Queue.  This value is only
+     * meaningful at the time of the call as the size of the Queue changes rapidly
+     * as Messages arrive and are consumed.
+     *
+     * @return the current number of Messages in the Queue.
+     */
+    int size();
+
+    /**
+     * Clears the Queue of any Messages.
+     */
+    void clear();
+
+    /**
+     * Removes and returns all Messages in the Queue.
+     *
+     * @return a List containing all Messages removed from the Queue.
+     */
+    List<JmsInboundMessageDispatch> removeAll();
+
+    /**
+     * @return the lock object used to protect against concurrent access.
+     */
+    Object getLock();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PriorityMessageQueue.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PriorityMessageQueue.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PriorityMessageQueue.java
new file mode 100644
index 0000000..e0c7357
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PriorityMessageQueue.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.util;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+
+/**
+ * Simple Message Priority ordered Queue.  Message envelopes are stored in the
+ * Queue based on their priority value.
+ */
+public final class PriorityMessageQueue extends AbstractMessageQueue {
+
+    private static final Integer MAX_PRIORITY = 10;
+
+    private final LinkedList<JmsInboundMessageDispatch>[] lists;
+    private int size = 0;
+
+    @SuppressWarnings("unchecked")
+    public PriorityMessageQueue() {
+        this.lists = new LinkedList[MAX_PRIORITY];
+        for (int i = 0; i < MAX_PRIORITY; i++) {
+            lists[i] = new LinkedList<JmsInboundMessageDispatch>();
+        }
+    }
+
+    @Override
+    public void enqueue(JmsInboundMessageDispatch envelope) {
+        synchronized (lock) {
+            getList(envelope).addLast(envelope);
+            this.size++;
+            lock.notify();
+        }
+    }
+
+    @Override
+    public void enqueueFirst(JmsInboundMessageDispatch envelope) {
+        synchronized (lock) {
+            getList(envelope).addFirst(envelope);
+            this.size++;
+            lock.notify();
+        }
+    }
+
+    @Override
+    public boolean isEmpty() {
+        synchronized (lock) {
+            return size == 0;
+        }
+    }
+
+    @Override
+    public int size() {
+        synchronized (lock) {
+            return size;
+        }
+    }
+
+    @Override
+    public void clear() {
+        synchronized (lock) {
+            for (int i = 0; i < MAX_PRIORITY; i++) {
+                lists[i].clear();
+            }
+            this.size = 0;
+        }
+    }
+
+    @Override
+    public List<JmsInboundMessageDispatch> removeAll() {
+        synchronized (lock) {
+            ArrayList<JmsInboundMessageDispatch> result = new ArrayList<JmsInboundMessageDispatch>(size());
+            for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
+                List<JmsInboundMessageDispatch> list = lists[i];
+                result.addAll(list);
+                size -= list.size();
+                list.clear();
+            }
+            return result;
+        }
+    }
+
+    @Override
+    protected JmsInboundMessageDispatch removeFirst() {
+        if (this.size > 0) {
+            for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
+                LinkedList<JmsInboundMessageDispatch> list = lists[i];
+                if (!list.isEmpty()) {
+                    this.size--;
+                    return list.removeFirst();
+                }
+            }
+        }
+        return null;
+    }
+
+    @Override
+    protected JmsInboundMessageDispatch peekFirst() {
+        if (this.size > 0) {
+            for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
+                LinkedList<JmsInboundMessageDispatch> list = lists[i];
+                if (!list.isEmpty()) {
+                    return list.peekFirst();
+                }
+            }
+        }
+        return null;
+    }
+
+    private int getPriority(JmsInboundMessageDispatch envelope) {
+        int priority = javax.jms.Message.DEFAULT_PRIORITY;
+        if (envelope.getMessage() != null) {
+            try {
+                priority = Math.max(envelope.getMessage().getJMSPriority(), 0);
+            } catch (JMSException e) {
+            }
+            priority = Math.min(priority, 9);
+        }
+        return priority;
+    }
+
+    private LinkedList<JmsInboundMessageDispatch> getList(JmsInboundMessageDispatch envelope) {
+        return lists[getPriority(envelope)];
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PropertyUtil.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PropertyUtil.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PropertyUtil.java
new file mode 100644
index 0000000..8eb61d2
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PropertyUtil.java
@@ -0,0 +1,486 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.util;
+
+import java.beans.BeanInfo;
+import java.beans.Introspector;
+import java.beans.PropertyDescriptor;
+import java.beans.PropertyEditor;
+import java.beans.PropertyEditorManager;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import javax.net.ssl.SSLContext;
+
+/**
+ * Utilities for properties
+ */
+public class PropertyUtil {
+
+    /**
+     * Creates a URI from the original URI and the given parameters.
+     *
+     * @param originalURI
+     *        The URI whose current parameters are remove and replaced with the given remainder
+     *        value.
+     * @param params
+     *        The URI params that should be used to replace the current ones in the target.
+     *
+     * @return a new URI that matches the original one but has its query options replaced with
+     *         the given ones.
+     *
+     * @throws URISyntaxException
+     */
+    public static URI replaceQuery(URI originalURI, Map<String, String> params) throws URISyntaxException {
+        String s = createQueryString(params);
+        if (s.length() == 0) {
+            s = null;
+        }
+        return replaceQuery(originalURI, s);
+    }
+
+    /**
+     * Creates a URI with the given query, removing an previous query value from the given URI.
+     *
+     * @param uri
+     *        The source URI whose existing query is replaced with the newly supplied one.
+     * @param query
+     *        The new URI query string that should be appended to the given URI.
+     *
+     * @return a new URI that is a combination of the original URI and the given query string.
+     * @throws URISyntaxException
+     */
+    public static URI replaceQuery(URI uri, String query) throws URISyntaxException {
+        String schemeSpecificPart = uri.getRawSchemeSpecificPart();
+        // strip existing query if any
+        int questionMark = schemeSpecificPart.lastIndexOf("?");
+        // make sure question mark is not within parentheses
+        if (questionMark < schemeSpecificPart.lastIndexOf(")")) {
+            questionMark = -1;
+        }
+        if (questionMark > 0) {
+            schemeSpecificPart = schemeSpecificPart.substring(0, questionMark);
+        }
+        if (query != null && query.length() > 0) {
+            schemeSpecificPart += "?" + query;
+        }
+        return new URI(uri.getScheme(), schemeSpecificPart, uri.getFragment());
+    }
+
+    /**
+     * Creates a URI with the given query, removing an previous query value from the given URI.
+     *
+     * @param uri
+     *        The source URI whose existing query is replaced with the newly supplied one.
+     * @param query
+     *        The new URI query string that should be appended to the given URI.
+     *
+     * @return a new URI that is a combination of the original URI and the given query string.
+     * @throws URISyntaxException
+     */
+    public static URI eraseQuery(URI uri) throws URISyntaxException {
+        return replaceQuery(uri, (String) null);
+    }
+
+    /**
+     * Given a key / value mapping, create and return a URI formatted query string that is valid
+     * and can be appended to a URI.
+     *
+     * @param options
+     *        The Mapping that will create the new Query string.
+     *
+     * @return a URI formatted query string.
+     *
+     * @throws URISyntaxException
+     */
+    public static String createQueryString(Map<String, ? extends Object> options) throws URISyntaxException {
+        try {
+            if (options.size() > 0) {
+                StringBuffer rc = new StringBuffer();
+                boolean first = true;
+                for (String key : options.keySet()) {
+                    if (first) {
+                        first = false;
+                    } else {
+                        rc.append("&");
+                    }
+                    String value = (String) options.get(key);
+                    rc.append(URLEncoder.encode(key, "UTF-8"));
+                    rc.append("=");
+                    rc.append(URLEncoder.encode(value, "UTF-8"));
+                }
+                return rc.toString();
+            } else {
+                return "";
+            }
+        } catch (UnsupportedEncodingException e) {
+            throw (URISyntaxException) new URISyntaxException(e.toString(), "Invalid encoding").initCause(e);
+        }
+    }
+
+    /**
+     * Get properties from a URI
+     *
+     * @param uri
+     * @return <Code>Map</Code> of properties
+     * @throws Exception
+     */
+    @SuppressWarnings("unchecked")
+    public static Map<String, String> parseParameters(URI uri) throws Exception {
+        return uri.getQuery() == null ? Collections.EMPTY_MAP : parseQuery(stripPrefix(uri.getQuery(), "?"));
+    }
+
+    /**
+     * Parse properties from a named resource -eg. a URI or a simple name e.g.
+     * foo?name="fred"&size=2
+     *
+     * @param uri
+     * @return <Code>Map</Code> of properties
+     * @throws Exception
+     */
+    @SuppressWarnings("unchecked")
+    public static Map<String, String> parseParameters(String uri) throws Exception {
+        return uri == null ? Collections.EMPTY_MAP : parseQuery(stripUpto(uri, '?'));
+    }
+
+    /**
+     * Get properties from a uri
+     *
+     * @param uri
+     * @return <Code>Map</Code> of properties
+     *
+     * @throws Exception
+     */
+    public static Map<String, String> parseQuery(String uri) throws Exception {
+        if (uri != null) {
+            Map<String, String> rc = new HashMap<String, String>();
+            if (uri != null) {
+                String[] parameters = uri.split("&");
+                for (int i = 0; i < parameters.length; i++) {
+                    int p = parameters[i].indexOf("=");
+                    if (p >= 0) {
+                        String name = URLDecoder.decode(parameters[i].substring(0, p), "UTF-8");
+                        String value = URLDecoder.decode(parameters[i].substring(p + 1), "UTF-8");
+                        rc.put(name, value);
+                    } else {
+                        rc.put(parameters[i], null);
+                    }
+                }
+            }
+            return rc;
+        }
+        return Collections.emptyMap();
+    }
+
+    /**
+     * Given a map of properties, filter out only those prefixed with the given value, the
+     * values filtered are returned in a new Map instance.
+     *
+     * @param properties
+     *        The map of properties to filter.
+     * @param optionPrefix
+     *        The prefix value to use when filtering.
+     *
+     * @return a filter map with only values that match the given prefix.
+     */
+    public static Map<String, String> filterProperties(Map<String, String> props, String optionPrefix) {
+        if (props == null) {
+            throw new IllegalArgumentException("props was null.");
+        }
+
+        HashMap<String, String> rc = new HashMap<String, String>(props.size());
+
+        for (Iterator<String> iter = props.keySet().iterator(); iter.hasNext();) {
+            String name = iter.next();
+            if (name.startsWith(optionPrefix)) {
+                String value = props.get(name);
+                name = name.substring(optionPrefix.length());
+                rc.put(name, value);
+                iter.remove();
+            }
+        }
+
+        return rc;
+    }
+
+    /**
+     * Add bean properties to a URI
+     *
+     * @param uri
+     * @param bean
+     * @return <Code>Map</Code> of properties
+     * @throws Exception
+     */
+    public static String addPropertiesToURIFromBean(String uri, Object bean) throws Exception {
+        Map<String, String> props = PropertyUtil.getProperties(bean);
+        return PropertyUtil.addPropertiesToURI(uri, props);
+    }
+
+    /**
+     * Add properties to a URI
+     *
+     * @param uri
+     * @param props
+     * @return uri with properties on
+     * @throws Exception
+     */
+    public static String addPropertiesToURI(URI uri, Map<String, String> props) throws Exception {
+        return addPropertiesToURI(uri.toString(), props);
+    }
+
+    /**
+     * Add properties to a URI
+     *
+     * @param uri
+     * @param props
+     * @return uri with properties on
+     * @throws Exception
+     */
+    public static String addPropertiesToURI(String uri, Map<String, String> props) throws Exception {
+        String result = uri;
+        if (uri != null && props != null) {
+            StringBuilder base = new StringBuilder(stripBefore(uri, '?'));
+            Map<String, String> map = parseParameters(uri);
+            if (!map.isEmpty()) {
+                map.putAll(props);
+            }
+            if (!map.isEmpty()) {
+                base.append('?');
+                boolean first = true;
+                for (Map.Entry<String, String> entry : map.entrySet()) {
+                    if (!first) {
+                        base.append('&');
+                    }
+                    first = false;
+                    base.append(entry.getKey()).append("=").append(entry.getValue());
+                }
+                result = base.toString();
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Set properties on an object using the provided map. The return value indicates if all
+     * properties from the given map were set on the target object.
+     *
+     * @param target
+     *        the object whose properties are to be set from the map options.
+     * @param props
+     *        the properties that should be applied to the given object.
+     *
+     * @return true if all values in the props map were applied to the target object.
+     */
+    public static boolean setProperties(Object target, Map<String, String> props) {
+        if (target == null) {
+            throw new IllegalArgumentException("target was null.");
+        }
+        if (props == null) {
+            throw new IllegalArgumentException("props was null.");
+        }
+
+        int setCounter = 0;
+
+        for (Map.Entry<String, String> entry : props.entrySet()) {
+            if (setProperty(target, entry.getKey(), entry.getValue())) {
+                setCounter++;
+            }
+        }
+
+        return setCounter == props.size();
+    }
+
+    /**
+     * Get properties from an object
+     *
+     * @param object
+     * @return <Code>Map</Code> of properties
+     * @throws Exception
+     */
+    public static Map<String, String> getProperties(Object object) throws Exception {
+        Map<String, String> props = new LinkedHashMap<String, String>();
+        BeanInfo beanInfo = Introspector.getBeanInfo(object.getClass());
+        Object[] NULL_ARG = {};
+        PropertyDescriptor[] propertyDescriptors = beanInfo.getPropertyDescriptors();
+        if (propertyDescriptors != null) {
+            for (int i = 0; i < propertyDescriptors.length; i++) {
+                PropertyDescriptor pd = propertyDescriptors[i];
+                if (pd.getReadMethod() != null && !pd.getName().equals("class") && !pd.getName().equals("properties") && !pd.getName().equals("reference")) {
+                    Object value = pd.getReadMethod().invoke(object, NULL_ARG);
+                    if (value != null) {
+                        if (value instanceof Boolean || value instanceof Number || value instanceof String || value instanceof URI || value instanceof URL) {
+                            props.put(pd.getName(), ("" + value));
+                        } else if (value instanceof SSLContext) {
+                            // ignore this one..
+                        } else {
+                            Map<String, String> inner = getProperties(value);
+                            for (Map.Entry<String, String> entry : inner.entrySet()) {
+                                props.put(pd.getName() + "." + entry.getKey(), entry.getValue());
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        return props;
+    }
+
+    /**
+     * Find a specific property getter in a given object based on a property name.
+     *
+     * @param object
+     *        the object to search.
+     * @param name
+     *        the property name to search for.
+     *
+     * @return the result of invoking the specific property get method.
+     *
+     * @throws Exception if an error occurs while searching the object's bean info.
+     */
+    public static Object getProperty(Object object, String name) throws Exception {
+        BeanInfo beanInfo = Introspector.getBeanInfo(object.getClass());
+        PropertyDescriptor[] propertyDescriptors = beanInfo.getPropertyDescriptors();
+        if (propertyDescriptors != null) {
+            for (int i = 0; i < propertyDescriptors.length; i++) {
+                PropertyDescriptor pd = propertyDescriptors[i];
+                if (pd.getReadMethod() != null && pd.getName().equals(name)) {
+                    return pd.getReadMethod().invoke(object);
+                }
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Set a property
+     *
+     * @param target
+     * @param name
+     * @param value
+     * @return true if set
+     */
+    public static boolean setProperty(Object target, String name, Object value) {
+        try {
+            int dotPos = name.indexOf(".");
+            while (dotPos >= 0) {
+                String getterName = name.substring(0, dotPos);
+                target = getProperty(target, getterName);
+                name = name.substring(dotPos + 1);
+                dotPos = name.indexOf(".");
+            }
+
+            Class<? extends Object> clazz = target.getClass();
+            Method setter = findSetterMethod(clazz, name);
+            if (setter == null) {
+                return false;
+            }
+            // If the type is null or it matches the needed type, just use the
+            // value directly
+            if (value == null || value.getClass() == setter.getParameterTypes()[0]) {
+                setter.invoke(target, new Object[] { value });
+            } else {
+                // We need to convert it
+                setter.invoke(target, new Object[] { convert(value, setter.getParameterTypes()[0]) });
+            }
+            return true;
+        } catch (Throwable ignore) {
+            return false;
+        }
+    }
+
+    /**
+     * Return a String past a prefix
+     *
+     * @param value
+     * @param prefix
+     * @return stripped
+     */
+    public static String stripPrefix(String value, String prefix) {
+        if (value.startsWith(prefix)) {
+            return value.substring(prefix.length());
+        }
+        return value;
+    }
+
+    /**
+     * Return a String from to a character
+     *
+     * @param value
+     * @param c
+     * @return stripped
+     */
+    public static String stripUpto(String value, char c) {
+        String result = null;
+        int index = value.indexOf(c);
+        if (index > 0) {
+            result = value.substring(index + 1);
+        }
+        return result;
+    }
+
+    /**
+     * Return a String up to and including character
+     *
+     * @param value
+     * @param c
+     * @return stripped
+     */
+    public static String stripBefore(String value, char c) {
+        String result = value;
+        int index = value.indexOf(c);
+        if (index > 0) {
+            result = value.substring(0, index);
+        }
+        return result;
+    }
+
+    private static Method findSetterMethod(Class<? extends Object> clazz, String name) {
+        // Build the method name.
+        name = "set" + name.substring(0, 1).toUpperCase() + name.substring(1);
+        Method[] methods = clazz.getMethods();
+        for (int i = 0; i < methods.length; i++) {
+            Method method = methods[i];
+            Class<? extends Object> params[] = method.getParameterTypes();
+            if (method.getName().equals(name) && params.length == 1) {
+                return method;
+            }
+        }
+        return null;
+    }
+
+    private static Object convert(Object value, Class<?> type) throws Exception {
+        PropertyEditor editor = PropertyEditorManager.findEditor(type);
+        if (editor != null) {
+            editor.setAsText(value.toString());
+            return editor.getValue();
+        }
+        if (type == URI.class) {
+            return new URI(value.toString());
+        }
+        return null;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org