You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2014/09/23 20:20:39 UTC
[15/27] Initial drop of donated AMQP Client Code.
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedInputStream.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedInputStream.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedInputStream.java
new file mode 100644
index 0000000..c7ba887
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedInputStream.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * An optimized buffered input stream for Tcp
+ */
+public class TcpBufferedInputStream extends FilterInputStream {
+
+ private static final int DEFAULT_BUFFER_SIZE = 8192;
+ protected byte internalBuffer[];
+ protected int count;
+ protected int position;
+
+ public TcpBufferedInputStream(InputStream in) {
+ this(in, DEFAULT_BUFFER_SIZE);
+ }
+
+ public TcpBufferedInputStream(InputStream in, int size) {
+ super(in);
+ if (size <= 0) {
+ throw new IllegalArgumentException("Buffer size <= 0");
+ }
+ internalBuffer = new byte[size];
+ }
+
+ protected void fill() throws IOException {
+ byte[] buffer = internalBuffer;
+ count = 0;
+ position = 0;
+ int n = in.read(buffer, position, buffer.length - position);
+ if (n > 0) {
+ count = n + position;
+ }
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (position >= count) {
+ fill();
+ if (position >= count) {
+ return -1;
+ }
+ }
+ return internalBuffer[position++] & 0xff;
+ }
+
+ private int readStream(byte[] b, int off, int len) throws IOException {
+ int avail = count - position;
+ if (avail <= 0) {
+ if (len >= internalBuffer.length) {
+ return in.read(b, off, len);
+ }
+ fill();
+ avail = count - position;
+ if (avail <= 0) {
+ return -1;
+ }
+ }
+ int cnt = (avail < len) ? avail : len;
+ System.arraycopy(internalBuffer, position, b, off, cnt);
+ position += cnt;
+ return cnt;
+ }
+
+ @Override
+ public int read(byte b[], int off, int len) throws IOException {
+ if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
+ throw new IndexOutOfBoundsException();
+ } else if (len == 0) {
+ return 0;
+ }
+ int n = 0;
+ for (;;) {
+ int nread = readStream(b, off + n, len - n);
+ if (nread <= 0) {
+ return (n == 0) ? nread : n;
+ }
+ n += nread;
+ if (n >= len) {
+ return n;
+ }
+ // if not closed but no bytes available, return
+ InputStream input = in;
+ if (input != null && input.available() <= 0) {
+ return n;
+ }
+ }
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ if (n <= 0) {
+ return 0;
+ }
+ long avail = count - position;
+ if (avail <= 0) {
+ return in.skip(n);
+ }
+ long skipped = (avail < n) ? avail : n;
+ position += skipped;
+ return skipped;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return in.available() + (count - position);
+ }
+
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (in != null) {
+ in.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedOutputStream.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedOutputStream.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedOutputStream.java
new file mode 100644
index 0000000..82f8c41
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedOutputStream.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * An optimized buffered outputstream for Tcp
+ */
+public class TcpBufferedOutputStream extends FilterOutputStream {
+
+ private static final int BUFFER_SIZE = 8192;
+ private final byte[] buffer;
+ private final int bufferlen;
+ private int count;
+
+ /**
+ * Constructor
+ *
+ * @param out
+ */
+ public TcpBufferedOutputStream(OutputStream out) {
+ this(out, BUFFER_SIZE);
+ }
+
+ /**
+ * Creates a new buffered output stream to write data to the specified underlying output
+ * stream with the specified buffer size.
+ *
+ * @param out
+ * the underlying output stream.
+ * @param size
+ * the buffer size.
+ * @throws IllegalArgumentException
+ * if size <= 0.
+ */
+ public TcpBufferedOutputStream(OutputStream out, int size) {
+ super(out);
+ if (size <= 0) {
+ throw new IllegalArgumentException("Buffer size <= 0");
+ }
+ buffer = new byte[size];
+ bufferlen = size;
+ }
+
+ /**
+ * write a byte on to the stream
+ *
+ * @param b
+ * - byte to write
+ * @throws IOException
+ */
+ @Override
+ public void write(int b) throws IOException {
+ if ((bufferlen - count) < 1) {
+ flush();
+ }
+ buffer[count++] = (byte) b;
+ }
+
+ /**
+ * write a byte array to the stream
+ *
+ * @param b
+ * the byte buffer
+ * @param off
+ * the offset into the buffer
+ * @param len
+ * the length of data to write
+ * @throws IOException
+ */
+ @Override
+ public void write(byte b[], int off, int len) throws IOException {
+ if (b != null) {
+ if ((bufferlen - count) < len) {
+ flush();
+ }
+ if (buffer.length >= len) {
+ System.arraycopy(b, off, buffer, count, len);
+ count += len;
+ } else {
+ out.write(b, off, len);
+ }
+ }
+ }
+
+ /**
+ * flush the data to the output stream This doesn't call flush on the underlying
+ * outputstream, because Tcp is particularly efficent at doing this itself ....
+ *
+ * @throws IOException
+ */
+ @Override
+ public void flush() throws IOException {
+ if (count > 0 && out != null) {
+ out.write(buffer, 0, count);
+ count = 0;
+ }
+ }
+
+ /**
+ * close this stream
+ *
+ * @throws IOException
+ */
+ @Override
+ public void close() throws IOException {
+ super.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransport.java
new file mode 100644
index 0000000..4a58c8e
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransport.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.qpid.jms.util.IOExceptionSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.vertx.java.core.AsyncResult;
+import org.vertx.java.core.AsyncResultHandler;
+import org.vertx.java.core.Handler;
+import org.vertx.java.core.Vertx;
+import org.vertx.java.core.VertxFactory;
+import org.vertx.java.core.buffer.Buffer;
+import org.vertx.java.core.net.NetClient;
+import org.vertx.java.core.net.NetSocket;
+
+/**
+ * Vertex based TCP transport for raw data packets.
+ */
+public class TcpTransport implements Transport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TcpTransport.class);
+
+ private final Vertx vertx = VertxFactory.newVertx();
+ private final NetClient client = vertx.createNetClient();
+ private final URI remoteLocation;
+ private final AtomicBoolean connected = new AtomicBoolean();
+ private final AtomicBoolean closed = new AtomicBoolean();
+ private final AtomicReference<Throwable> connectionError = new AtomicReference<Throwable>();
+
+ private NetSocket socket;
+
+ private TransportListener listener;
+ private int socketBufferSize = 64 * 1024;
+ private int soTimeout = -1;
+ private int connectTimeout = -1;
+ private int soLinger = Integer.MIN_VALUE;
+ private boolean keepAlive;
+ private boolean tcpNoDelay = true;
+
+ /**
+ * Create a new instance of the transport.
+ *
+ * @param listener
+ * The TransportListener that will receive data from this Transport instance.
+ * @param remoteLocation
+ * The remote location where this transport should connection to.
+ */
+ public TcpTransport(TransportListener listener, URI remoteLocation) {
+ this.listener = listener;
+ this.remoteLocation = remoteLocation;
+ }
+
+ @Override
+ public void connect() throws IOException {
+ final CountDownLatch connectLatch = new CountDownLatch(1);
+
+ if (listener == null) {
+ throw new IllegalStateException("A transport listener must be set before connection attempts.");
+ }
+
+ configureNetClient(client);
+
+ try {
+ client.connect(remoteLocation.getPort(), remoteLocation.getHost(), new AsyncResultHandler<NetSocket>() {
+ @Override
+ public void handle(AsyncResult<NetSocket> asyncResult) {
+ if (asyncResult.succeeded()) {
+ socket = asyncResult.result();
+ LOG.info("We have connected! Socket is {}", socket);
+
+ connected.set(true);
+ connectLatch.countDown();
+
+ socket.dataHandler(new Handler<Buffer>() {
+ @Override
+ public void handle(Buffer event) {
+ listener.onData(event);
+ }
+ });
+
+ socket.closeHandler(new Handler<Void>() {
+ @Override
+ public void handle(Void event) {
+ connected.set(false);
+ listener.onTransportClosed();
+ }
+ });
+
+ socket.exceptionHandler(new Handler<Throwable>() {
+ @Override
+ public void handle(Throwable event) {
+ connected.set(false);
+ listener.onTransportError(event);
+ }
+ });
+
+ } else {
+ connected.set(false);
+ connectionError.set(asyncResult.cause());
+ connectLatch.countDown();
+ }
+ }
+ });
+ } catch (Throwable reason) {
+ LOG.info("Failed to connect to target Broker: {}", reason);
+ throw IOExceptionSupport.create(reason);
+ }
+
+ try {
+ connectLatch.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ if (connectionError.get() != null) {
+ throw IOExceptionSupport.create(connectionError.get());
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed.compareAndSet(false, true)) {
+ if (connected.get()) {
+ socket.close();
+ connected.set(false);
+ }
+
+ vertx.stop();
+ }
+ }
+
+ @Override
+ public void send(ByteBuffer output) throws IOException {
+ checkConnected();
+ int length = output.remaining();
+ if (length == 0) {
+ return;
+ }
+
+ byte[] copy = new byte[length];
+ output.get(copy);
+ Buffer sendBuffer = new Buffer(copy);
+
+ vertx.eventBus().send(socket.writeHandlerID(), sendBuffer);
+ }
+
+ @Override
+ public void send(org.fusesource.hawtbuf.Buffer output) throws IOException {
+ checkConnected();
+ int length = output.length();
+ if (length == 0) {
+ return;
+ }
+
+ org.fusesource.hawtbuf.Buffer clone = output.deepCopy();
+ Buffer sendBuffer = new Buffer(clone.data);
+ vertx.eventBus().send(socket.writeHandlerID(), sendBuffer);
+ }
+
+ /**
+ * Allows a subclass to configure the NetClient beyond what this transport might do.
+ *
+ * @throws IOException if an error occurs.
+ */
+ protected void configureNetClient(NetClient client) throws IOException {
+ client.setSendBufferSize(getSocketBufferSize());
+ client.setReceiveBufferSize(getSocketBufferSize());
+ client.setSoLinger(soLinger);
+ client.setTCPKeepAlive(keepAlive);
+ client.setTCPNoDelay(tcpNoDelay);
+ if (connectTimeout >= 0) {
+ client.setConnectTimeout(connectTimeout);
+ }
+ }
+
+ @Override
+ public boolean isConnected() {
+ return this.connected.get();
+ }
+
+ private void checkConnected() throws IOException {
+ if (!connected.get()) {
+ throw new IOException("Cannot send to a non-connected transport.");
+ }
+ }
+
+ @Override
+ public TransportListener getTransportListener() {
+ return this.listener;
+ }
+
+ @Override
+ public void setTransportListener(TransportListener listener) {
+ if (listener == null) {
+ throw new IllegalArgumentException("Listener cannot be set to null");
+ }
+
+ this.listener = listener;
+ }
+
+ public int getSocketBufferSize() {
+ return socketBufferSize;
+ }
+
+ public void setSocketBufferSize(int socketBufferSize) {
+ this.socketBufferSize = socketBufferSize;
+ }
+
+ public int getSoTimeout() {
+ return soTimeout;
+ }
+
+ public void setSoTimeout(int soTimeout) {
+ this.soTimeout = soTimeout;
+ }
+
+ public boolean isTcpNoDelay() {
+ return tcpNoDelay;
+ }
+
+ public void setTcpNoDelay(boolean tcpNoDelay) {
+ this.tcpNoDelay = tcpNoDelay;
+ }
+
+ public int getSoLinger() {
+ return soLinger;
+ }
+
+ public void setSoLinger(int soLinger) {
+ this.soLinger = soLinger;
+ }
+
+ public boolean isKeepAlive() {
+ return keepAlive;
+ }
+
+ public void setKeepAlive(boolean keepAlive) {
+ this.keepAlive = keepAlive;
+ }
+
+ public int getConnectTimeout() {
+ return connectTimeout;
+ }
+
+ public void setConnectTimeout(int connectTimeout) {
+ this.connectTimeout = connectTimeout;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java
new file mode 100644
index 0000000..4cced80
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.fusesource.hawtbuf.Buffer;
+
+/**
+ * Base class for all QpidJMS Transport instances.
+ */
+public interface Transport {
+
+ /**
+ * Performs the protocol connect operation for the implemented Transport type
+ * such as a TCP socket connection etc.
+ *
+ * @throws IOException if an error occurs while attempting the connect.
+ */
+ void connect() throws IOException;
+
+ /**
+ * @return true if transport is connected or false if the connection is down.
+ */
+ boolean isConnected();
+
+ /**
+ * Close the Transport, no additional send operations are accepted.
+ *
+ * @throws IOException if an error occurs while closing the connection.
+ */
+ void close() throws IOException;
+
+ /**
+ * Sends a chunk of data over the Transport connection.
+ *
+ * @param output
+ * The buffer of data that is to be transmitted.
+ *
+ * @throws IOException if an error occurs during the send operation.
+ */
+ void send(ByteBuffer output) throws IOException;
+
+ /**
+ * Sends a chunk of data over the Transport connection.
+ *
+ * @param output
+ * The buffer of data that is to be transmitted.
+ *
+ * @throws IOException if an error occurs during the send operation.
+ */
+ void send(Buffer output) throws IOException;
+
+ /**
+ * Gets the currently set TransportListener instance
+ *
+ * @returns the current TransportListener or null if none set.
+ */
+ TransportListener getTransportListener();
+
+ /**
+ * Sets the Transport Listener instance that will be notified of incoming data or
+ * error events.
+ *
+ * @param listener
+ * The new TransportListener instance to use (cannot be null).
+ *
+ * @throws IllegalArgumentException if the given listener is null.
+ */
+ void setTransportListener(TransportListener listener);
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportListener.java
new file mode 100644
index 0000000..f244347
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportListener.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports;
+
+import org.vertx.java.core.buffer.Buffer;
+
+/**
+ * Listener interface that should be implemented by users of the various
+ * QpidJMS Transport classes.
+ */
+public interface TransportListener {
+
+ /**
+ * Called when new incoming data has become available.
+ *
+ * @param incoming
+ * the next incoming packet of data.
+ */
+ void onData(Buffer incoming);
+
+ /**
+ * Called if the connection state becomes closed.
+ */
+ void onTransportClosed();
+
+ /**
+ * Called when an error occurs during normal Transport operations.
+ *
+ * @param cause
+ * the error that triggered this event.
+ */
+ void onTransportError(Throwable cause);
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/AbstractMessageQueue.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/AbstractMessageQueue.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/AbstractMessageQueue.java
new file mode 100644
index 0000000..f20c21f
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/AbstractMessageQueue.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.util;
+
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+
+/**
+ * Abstract Message Queue class used to implement the common functions of a Message Queue
+ * instance.
+ */
+public abstract class AbstractMessageQueue implements MessageQueue {
+
+ protected boolean closed;
+ protected boolean running;
+ protected Object lock = new Object();
+
+ @Override
+ public JmsInboundMessageDispatch peek() {
+ synchronized (lock) {
+ return peekFirst();
+ }
+ }
+
+ @Override
+ public JmsInboundMessageDispatch dequeue(long timeout) throws InterruptedException {
+ synchronized (lock) {
+ // Wait until the consumer is ready to deliver messages.
+ while (timeout != 0 && !closed && (isEmpty() || !running)) {
+ if (timeout == -1) {
+ lock.wait();
+ } else {
+ lock.wait(timeout);
+ break;
+ }
+ }
+
+ if (closed || !running || isEmpty()) {
+ return null;
+ }
+
+ return removeFirst();
+ }
+ }
+
+ @Override
+ public JmsInboundMessageDispatch dequeueNoWait() {
+ synchronized (lock) {
+ if (closed || !running || isEmpty()) {
+ return null;
+ }
+ return removeFirst();
+ }
+ }
+
+ @Override
+ public void start() {
+ synchronized (lock) {
+ running = true;
+ lock.notifyAll();
+ }
+ }
+
+ @Override
+ public void stop() {
+ synchronized (lock) {
+ running = false;
+ lock.notifyAll();
+ }
+ }
+
+ @Override
+ public boolean isRunning() {
+ return running;
+ }
+
+ @Override
+ public void close() {
+ synchronized (lock) {
+ if (!closed) {
+ running = false;
+ closed = true;
+ }
+ lock.notifyAll();
+ }
+ }
+
+ @Override
+ public boolean isClosed() {
+ return closed;
+ }
+
+ @Override
+ public Object getLock() {
+ return lock;
+ }
+
+ /**
+ * Removes and returns the first entry in the implementation queue. This method
+ * is always called under lock and does not need to protect itself or check running
+ * state etc.
+ *
+ * @return the first message queued in the implemented queue.
+ */
+ protected abstract JmsInboundMessageDispatch removeFirst();
+
+ /**
+ * Returns but does not remove the first entry in the implementation queue. This method
+ * is always called under lock and does not need to protect itself or check running
+ * state etc.
+ *
+ * @return the first message queued in the implemented queue.
+ */
+ protected abstract JmsInboundMessageDispatch peekFirst();
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/ClassLoadingAwareObjectInputStream.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/ClassLoadingAwareObjectInputStream.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/ClassLoadingAwareObjectInputStream.java
new file mode 100644
index 0000000..8632ee2
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/ClassLoadingAwareObjectInputStream.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectStreamClass;
+import java.lang.reflect.Proxy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ClassLoadingAwareObjectInputStream extends ObjectInputStream {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ClassLoadingAwareObjectInputStream.class);
+ private static final ClassLoader FALLBACK_CLASS_LOADER = ClassLoadingAwareObjectInputStream.class.getClassLoader();
+
+ private final ClassLoader inLoader;
+
+ public ClassLoadingAwareObjectInputStream(InputStream in) throws IOException {
+ super(in);
+ inLoader = in.getClass().getClassLoader();
+ }
+
+ @Override
+ protected Class<?> resolveClass(ObjectStreamClass classDesc) throws IOException, ClassNotFoundException {
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ return load(classDesc.getName(), cl, inLoader);
+ }
+
+ @Override
+ protected Class<?> resolveProxyClass(String[] interfaces) throws IOException, ClassNotFoundException {
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ Class<?>[] cinterfaces = new Class[interfaces.length];
+ for (int i = 0; i < interfaces.length; i++) {
+ cinterfaces[i] = load(interfaces[i], cl);
+ }
+
+ try {
+ return Proxy.getProxyClass(cl, cinterfaces);
+ } catch (IllegalArgumentException e) {
+ try {
+ return Proxy.getProxyClass(inLoader, cinterfaces);
+ } catch (IllegalArgumentException e1) {
+ // ignore
+ }
+ try {
+ return Proxy.getProxyClass(FALLBACK_CLASS_LOADER, cinterfaces);
+ } catch (IllegalArgumentException e2) {
+ // ignore
+ }
+
+ throw new ClassNotFoundException(null, e);
+ }
+ }
+
+ private Class<?> load(String className, ClassLoader... cl) throws ClassNotFoundException {
+ // check for simple types first
+ final Class<?> clazz = loadSimpleType(className);
+ if (clazz != null) {
+ LOG.trace("Loaded class: {} as simple type -> ", className, clazz);
+ return clazz;
+ }
+
+ // try the different class loaders
+ for (ClassLoader loader : cl) {
+ LOG.trace("Attempting to load class: {} using classloader: {}", className, cl);
+ try {
+ Class<?> answer = Class.forName(className, false, loader);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Loaded class: {} using classloader: {} -> ", new Object[] { className, cl, answer });
+ }
+ return answer;
+ } catch (ClassNotFoundException e) {
+ LOG.trace("Class not found: {} using classloader: {}", className, cl);
+ // ignore
+ }
+ }
+
+ // and then the fallback class loader
+ return Class.forName(className, false, FALLBACK_CLASS_LOADER);
+ }
+
+ /**
+ * Load a simple type
+ *
+ * @param name
+ * the name of the class to load
+ * @return the class or <tt>null</tt> if it could not be loaded
+ */
+ public static Class<?> loadSimpleType(String name) {
+ if ("java.lang.byte[]".equals(name) || "byte[]".equals(name)) {
+ return byte[].class;
+ } else if ("java.lang.Byte[]".equals(name) || "Byte[]".equals(name)) {
+ return Byte[].class;
+ } else if ("java.lang.Object[]".equals(name) || "Object[]".equals(name)) {
+ return Object[].class;
+ } else if ("java.lang.String[]".equals(name) || "String[]".equals(name)) {
+ return String[].class;
+ // and these is common as well
+ } else if ("java.lang.String".equals(name) || "String".equals(name)) {
+ return String.class;
+ } else if ("java.lang.Boolean".equals(name) || "Boolean".equals(name)) {
+ return Boolean.class;
+ } else if ("boolean".equals(name)) {
+ return boolean.class;
+ } else if ("java.lang.Integer".equals(name) || "Integer".equals(name)) {
+ return Integer.class;
+ } else if ("int".equals(name)) {
+ return int.class;
+ } else if ("java.lang.Long".equals(name) || "Long".equals(name)) {
+ return Long.class;
+ } else if ("long".equals(name)) {
+ return long.class;
+ } else if ("java.lang.Short".equals(name) || "Short".equals(name)) {
+ return Short.class;
+ } else if ("short".equals(name)) {
+ return short.class;
+ } else if ("java.lang.Byte".equals(name) || "Byte".equals(name)) {
+ return Byte.class;
+ } else if ("byte".equals(name)) {
+ return byte.class;
+ } else if ("java.lang.Float".equals(name) || "Float".equals(name)) {
+ return Float.class;
+ } else if ("float".equals(name)) {
+ return float.class;
+ } else if ("java.lang.Double".equals(name) || "Double".equals(name)) {
+ return Double.class;
+ } else if ("double".equals(name)) {
+ return double.class;
+ }
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FactoryFinder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FactoryFinder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FactoryFinder.java
new file mode 100644
index 0000000..b8db705
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FactoryFinder.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.util;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A Factory finding helper class used to locate objects that serve as Factories for
+ * other Object types. The search an instantiate mechanism is configurable so that
+ * in a non-stand-alone environment such as OSGI the finder and be configured to work.
+ */
+public class FactoryFinder<T extends Object> {
+
+ /**
+ * The strategy that the FactoryFinder uses to find load and instantiate Objects can be
+ * changed out by calling the
+ * {@link org.apache.qpid.jms.util.FactoryFinder#setObjectFactory(org.apache.qpid.jms.util.FactoryFinder.ObjectFactory)}
+ * method with a custom implementation of ObjectFactory.
+ *
+ * The default ObjectFactory is typically changed out when running in a specialized
+ * container environment where service discovery needs to be done via the container system.
+ * For example, in an OSGi scenario.
+ */
+ public interface ObjectFactory {
+
+ /**
+ * Creates the requested factory instance.
+ *
+ * @param path
+ * the full service path
+ *
+ * @return instance of the factory object being searched for.
+ *
+ * @throws IllegalAccessException if an error occurs while accessing the search path.
+ * @throws InstantiationException if the factory object fails on create.
+ * @throws IOException if the search encounter an IO error.
+ * @throws ClassNotFoundException if the class that is to be loaded cannot be found.
+ */
+ public Object create(String path) throws IllegalAccessException, InstantiationException, IOException, ClassNotFoundException;
+
+ }
+
+ private static ObjectFactory objectFactory = new StandaloneObjectFactory();
+
+ private final ConcurrentHashMap<String, T> cachedFactories = new ConcurrentHashMap<String, T>();
+ private final String path;
+ private final Class<T> factoryType;
+
+ /**
+ * Creates a new instance of the FactoryFinder using the given search path.
+ *
+ * @param path
+ * The path to use when searching for the factory definitions.
+ */
+ public FactoryFinder(Class<T> factoryType, String path) {
+ this.path = path;
+ this.factoryType = factoryType;
+ }
+
+ /**
+ * @return the currently configured ObjectFactory instance used to locate the Factory objects.
+ */
+ public static ObjectFactory getObjectFactory() {
+ return objectFactory;
+ }
+
+ /**
+ * Sets the ObjectFactory instance to use when searching for the Factory class. This allows
+ * the default instance to be overridden in an environment where the basic version will not
+ * work.
+ *
+ * @param objectFactory
+ * the new object factory to use when searching for a Factory instance.
+ */
+ public static void setObjectFactory(ObjectFactory objectFactory) {
+ FactoryFinder.objectFactory = objectFactory;
+ }
+
+ /**
+ * Creates a new instance of the given key. The method first checks the cache of previously
+ * found factory instances for one that matches the key. If no cached version exists then
+ * the factory will be searched for using the configured ObjectFactory instance.
+ *
+ * @param key
+ * is the key to add to the path to find a text file containing the factory name
+ *
+ * @return a newly created instance
+ *
+ * @throws IllegalAccessException if an error occurs while accessing the search path.
+ * @throws InstantiationException if the factory object fails on create.
+ * @throws IOException if the search encounter an IO error.
+ * @throws ClassNotFoundException if the class that is to be loaded cannot be found.
+ * @throws ClassCastException if the found object is not assignable to the request factory type.
+ */
+ public T newInstance(String key) throws IllegalAccessException, InstantiationException, IOException, ClassNotFoundException, ClassCastException {
+ T factory = cachedFactories.get(key);
+ if (factory == null) {
+
+ Object found = objectFactory.create(path + key);
+ if (found != null && factoryType.isInstance(found)) {
+ factory = factoryType.cast(found);
+ cachedFactories.put(key, factory);
+ } else {
+ throw new ClassCastException("Cannot cast " + found.getClass().getName() +
+ " to " + factoryType.getName());
+ }
+ }
+
+ return factory;
+ }
+
+ /**
+ * Allow registration of a Provider factory without wiring via META-INF classes
+ *
+ * @param scheme
+ * The URI scheme value that names the target Provider instance.
+ * @param factory
+ * The factory to register in this finder.
+ */
+ public void registerProviderFactory(String scheme, T factory) {
+ cachedFactories.put(scheme, factory);
+ }
+
+ /**
+ * The default implementation of Object factory which works well in stand-alone applications.
+ */
+ @SuppressWarnings("rawtypes")
+ protected static class StandaloneObjectFactory implements ObjectFactory {
+ final ConcurrentHashMap<String, Class> classMap = new ConcurrentHashMap<String, Class>();
+
+ @Override
+ public Object create(final String path) throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException {
+ Class clazz = classMap.get(path);
+ if (clazz == null) {
+ clazz = loadClass(loadProperties(path));
+ classMap.put(path, clazz);
+ }
+ return clazz.newInstance();
+ }
+
+ static public Class loadClass(Properties properties) throws ClassNotFoundException, IOException {
+
+ String className = properties.getProperty("class");
+ if (className == null) {
+ throw new IOException("Expected property is missing: class");
+ }
+ Class clazz = null;
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ if (loader != null) {
+ try {
+ clazz = loader.loadClass(className);
+ } catch (ClassNotFoundException e) {
+ // ignore
+ }
+ }
+ if (clazz == null) {
+ clazz = FactoryFinder.class.getClassLoader().loadClass(className);
+ }
+
+ return clazz;
+ }
+
+ static public Properties loadProperties(String uri) throws IOException {
+ // lets try the thread context class loader first
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ if (classLoader == null) {
+ classLoader = StandaloneObjectFactory.class.getClassLoader();
+ }
+ InputStream in = classLoader.getResourceAsStream(uri);
+ if (in == null) {
+ in = FactoryFinder.class.getClassLoader().getResourceAsStream(uri);
+ if (in == null) {
+ throw new IOException("Could not find factory class for resource: " + uri);
+ }
+ }
+
+ // lets load the file
+ BufferedInputStream reader = null;
+ try {
+ reader = new BufferedInputStream(in);
+ Properties properties = new Properties();
+ properties.load(reader);
+ return properties;
+ } finally {
+ try {
+ reader.close();
+ } catch (Exception e) {
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java
new file mode 100644
index 0000000..b50e480
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.util;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+
+/**
+ * Simple first in / first out Message Queue.
+ */
+public final class FifoMessageQueue extends AbstractMessageQueue {
+
+ protected final LinkedList<JmsInboundMessageDispatch> list = new LinkedList<JmsInboundMessageDispatch>();
+
+ @Override
+ public void enqueueFirst(JmsInboundMessageDispatch envelope) {
+ synchronized (lock) {
+ list.addFirst(envelope);
+ lock.notify();
+ }
+ }
+
+ @Override
+ public void enqueue(JmsInboundMessageDispatch envelope) {
+ synchronized (lock) {
+ list.addLast(envelope);
+ lock.notify();
+ }
+ }
+
+ @Override
+ public boolean isEmpty() {
+ synchronized (lock) {
+ return list.isEmpty();
+ }
+ }
+
+ @Override
+ public int size() {
+ synchronized (lock) {
+ return list.size();
+ }
+ }
+
+ @Override
+ public void clear() {
+ synchronized (lock) {
+ list.clear();
+ }
+ }
+
+ @Override
+ public List<JmsInboundMessageDispatch> removeAll() {
+ synchronized (lock) {
+ ArrayList<JmsInboundMessageDispatch> rc = new ArrayList<JmsInboundMessageDispatch>(list.size());
+ for (JmsInboundMessageDispatch entry : list) {
+ rc.add(entry);
+ }
+ list.clear();
+ return rc;
+ }
+ }
+
+ @Override
+ public String toString() {
+ synchronized (lock) {
+ return list.toString();
+ }
+ }
+
+ @Override
+ protected JmsInboundMessageDispatch removeFirst() {
+ return list.removeFirst();
+ }
+
+ @Override
+ protected JmsInboundMessageDispatch peekFirst() {
+ return list.peekFirst();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/IOExceptionSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/IOExceptionSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/IOExceptionSupport.java
new file mode 100644
index 0000000..6caf15c
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/IOExceptionSupport.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.util;
+
+import java.io.IOException;
+
+/**
+ * Used to make throwing IOException instances easier.
+ */
+public class IOExceptionSupport {
+
+ /**
+ * Checks the given cause to determine if it's already an IOException type and
+ * if not creates a new IOException to wrap it.
+ *
+ * @param cause
+ * The initiating exception that should be cast or wrapped.
+ *
+ * @return an IOException instance.
+ */
+ public static IOException create(Throwable cause) {
+ if (cause instanceof IOException) {
+ return (IOException) cause;
+ }
+
+ String message = cause.getMessage();
+ if (message == null || message.length() == 0) {
+ message = cause.toString();
+ }
+
+ return new IOException(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/IdGenerator.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/IdGenerator.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/IdGenerator.java
new file mode 100644
index 0000000..f07b5b4
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/IdGenerator.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.util;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generator for Globally unique Strings.
+ */
+public class IdGenerator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(IdGenerator.class);
+ private static final String UNIQUE_STUB;
+ private static int instanceCount;
+ private static String hostName;
+ private String seed;
+ private final AtomicLong sequence = new AtomicLong(1);
+ private int length;
+ public static final String PROPERTY_IDGENERATOR_PORT = "activemq.idgenerator.port";
+
+ static {
+ String stub = "";
+ boolean canAccessSystemProps = true;
+ try {
+ SecurityManager sm = System.getSecurityManager();
+ if (sm != null) {
+ sm.checkPropertiesAccess();
+ }
+ } catch (SecurityException se) {
+ canAccessSystemProps = false;
+ }
+
+ if (canAccessSystemProps) {
+ int idGeneratorPort = 0;
+ ServerSocket ss = null;
+ try {
+ idGeneratorPort = Integer.parseInt(System.getProperty(PROPERTY_IDGENERATOR_PORT, "0"));
+ LOG.trace("Using port {}", idGeneratorPort);
+ hostName = InetAddressUtil.getLocalHostName();
+ ss = new ServerSocket(idGeneratorPort);
+ stub = "-" + ss.getLocalPort() + "-" + System.currentTimeMillis() + "-";
+ Thread.sleep(100);
+ } catch (Exception e) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("could not generate unique stub by using DNS and binding to local port", e);
+ } else {
+ LOG.warn("could not generate unique stub by using DNS and binding to local port: {} {}", e.getClass().getCanonicalName(), e.getMessage());
+ }
+
+ // Restore interrupted state so higher level code can deal with it.
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ } finally {
+ if (ss != null) {
+ try {
+ ss.close();
+ } catch (IOException ioe) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Closing the server socket failed", ioe);
+ } else {
+ LOG.warn("Closing the server socket failed" + " due " + ioe.getMessage());
+ }
+ }
+ }
+ }
+ }
+
+ if (hostName == null) {
+ hostName = "localhost";
+ }
+ hostName = sanitizeHostName(hostName);
+
+ if (stub.length() == 0) {
+ stub = "-1-" + System.currentTimeMillis() + "-";
+ }
+ UNIQUE_STUB = stub;
+ }
+
+ /**
+ * Construct an IdGenerator
+ */
+ public IdGenerator(String prefix) {
+ synchronized (UNIQUE_STUB) {
+ this.seed = prefix + UNIQUE_STUB + (instanceCount++) + ":";
+ this.length = this.seed.length() + ("" + Long.MAX_VALUE).length();
+ }
+ }
+
+ public IdGenerator() {
+ this("ID:" + hostName);
+ }
+
+ /**
+ * As we have to find the host name as a side-affect of generating a unique stub, we allow
+ * it's easy retrieval here
+ *
+ * @return the local host name
+ */
+ public static String getHostName() {
+ return hostName;
+ }
+
+ /**
+ * Generate a unique id
+ *
+ * @return a unique id
+ */
+ public synchronized String generateId() {
+ StringBuilder sb = new StringBuilder(length);
+ sb.append(seed);
+ sb.append(sequence.getAndIncrement());
+ return sb.toString();
+ }
+
+ public static String sanitizeHostName(String hostName) {
+ boolean changed = false;
+
+ StringBuilder sb = new StringBuilder();
+ for (char ch : hostName.toCharArray()) {
+ // only include ASCII chars
+ if (ch < 127) {
+ sb.append(ch);
+ } else {
+ changed = true;
+ }
+ }
+
+ if (changed) {
+ String newHost = sb.toString();
+ LOG.info("Sanitized hostname from: {} to: {}", hostName, newHost);
+ return newHost;
+ } else {
+ return hostName;
+ }
+ }
+
+ /**
+ * Generate a unique ID - that is friendly for a URL or file system
+ *
+ * @return a unique id
+ */
+ public String generateSanitizedId() {
+ String result = generateId();
+ result = result.replace(':', '-');
+ result = result.replace('_', '-');
+ result = result.replace('.', '-');
+ return result;
+ }
+
+ /**
+ * From a generated id - return the seed (i.e. minus the count)
+ *
+ * @param id
+ * the generated identifier
+ * @return the seed
+ */
+ public static String getSeedFromId(String id) {
+ String result = id;
+ if (id != null) {
+ int index = id.lastIndexOf(':');
+ if (index > 0 && (index + 1) < id.length()) {
+ result = id.substring(0, index);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * From a generated id - return the generator count
+ *
+ * @param id
+ * @return the count
+ */
+ public static long getSequenceFromId(String id) {
+ long result = -1;
+ if (id != null) {
+ int index = id.lastIndexOf(':');
+
+ if (index > 0 && (index + 1) < id.length()) {
+ String numStr = id.substring(index + 1, id.length());
+ result = Long.parseLong(numStr);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Does a proper compare on the Id's
+ *
+ * @param id1
+ * @param id2
+ * @return 0 if equal else a positive if id1 is > id2 ...
+ */
+ public static int compare(String id1, String id2) {
+ int result = -1;
+ String seed1 = IdGenerator.getSeedFromId(id1);
+ String seed2 = IdGenerator.getSeedFromId(id2);
+ if (seed1 != null && seed2 != null) {
+ result = seed1.compareTo(seed2);
+ if (result == 0) {
+ long count1 = IdGenerator.getSequenceFromId(id1);
+ long count2 = IdGenerator.getSequenceFromId(id2);
+ result = (int) (count1 - count2);
+ }
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/InetAddressUtil.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/InetAddressUtil.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/InetAddressUtil.java
new file mode 100644
index 0000000..9513892
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/InetAddressUtil.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.util;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+public class InetAddressUtil {
+
+ /**
+ * When using the {@link java.net.InetAddress#getHostName()} method in an
+ * environment where neither a proper DNS lookup nor an <tt>/etc/hosts</tt>
+ * entry exists for a given host, the following exception will be thrown:
+ * <code>
+ * java.net.UnknownHostException: <hostname>: <hostname>
+ * at java.net.InetAddress.getLocalHost(InetAddress.java:1425)
+ * ...
+ * </code>
+ * Instead of just throwing an UnknownHostException and giving up, this
+ * method grabs a suitable hostname from the exception and prevents the
+ * exception from being thrown. If a suitable hostname cannot be acquired
+ * from the exception, only then is the <tt>UnknownHostException</tt> thrown.
+ *
+ * @return The hostname
+ * @throws UnknownHostException
+ * @see {@link java.net.InetAddress#getLocalHost()}
+ * @see {@link java.net.InetAddress#getHostName()}
+ */
+ public static String getLocalHostName() throws UnknownHostException {
+ try {
+ return (InetAddress.getLocalHost()).getHostName();
+ } catch (UnknownHostException uhe) {
+ String host = uhe.getMessage(); // host = "hostname: hostname"
+ if (host != null) {
+ int colon = host.indexOf(':');
+ if (colon > 0) {
+ return host.substring(0, colon);
+ }
+ }
+ throw uhe;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/MessageQueue.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/MessageQueue.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/MessageQueue.java
new file mode 100644
index 0000000..a44faf6
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/MessageQueue.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.util;
+
+import java.util.List;
+
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+
+/**
+ * Queue based storage interface for inbound Messages.
+ */
+public interface MessageQueue {
+
+ /**
+ * Adds the given message envelope to the end of the Message queue.
+ *
+ * @param envelope
+ * The in-bound Message envelope to enqueue.
+ */
+ void enqueue(JmsInboundMessageDispatch envelope);
+
+ /**
+ * Adds the given message envelope to the front of the Message queue.
+ *
+ * @param envelope
+ * The in-bound Message envelope to enqueue.
+ */
+ void enqueueFirst(JmsInboundMessageDispatch envelope);
+
+ /**
+ * @return true if there are no messages in the queue.
+ */
+ boolean isEmpty();
+
+ /**
+ * Return but do not remove the first element in the Message queue.
+ *
+ * @return the first element in the Queue or null if empty.
+ */
+ JmsInboundMessageDispatch peek();
+
+ /**
+ * Used to get an enqueued message. The amount of time this method blocks is
+ * based on the timeout value. - if timeout==-1 then it blocks until a
+ * message is received. - if timeout==0 then it it tries to not block at
+ * all, it returns a message if it is available - if timeout>0 then it
+ * blocks up to timeout amount of time. Expired messages will consumed by
+ * this method.
+ *
+ * @return null if we timeout or if the consumer is closed.
+ *
+ * @throws InterruptedException if the wait is interrupted.
+ */
+ JmsInboundMessageDispatch dequeue(long timeout) throws InterruptedException;
+
+ /**
+ * Used to get an enqueued Message if on exists, otherwise returns null.
+ *
+ * @return the next Message in the Queue if one exists, otherwise null.
+ */
+ JmsInboundMessageDispatch dequeueNoWait();
+
+ /**
+ * Starts the Message Queue. An non-started Queue will always return null for
+ * any of the Queue accessor methods.
+ */
+ void start();
+
+ /**
+ * Stops the Message Queue. Messages cannot be read from the Queue when it is in
+ * the stopped state.
+ */
+ void stop();
+
+ /**
+ * @return true if the Queue is not in the stopped or closed state.
+ */
+ boolean isRunning();
+
+ /**
+ * Closes the Message Queue. No messages can be added or removed from the Queue
+ * once it has entered the closed state.
+ */
+ void close();
+
+ /**
+ * @return true if the Queue has been closed.
+ */
+ boolean isClosed();
+
+ /**
+ * Returns the number of Messages currently in the Queue. This value is only
+ * meaningful at the time of the call as the size of the Queue changes rapidly
+ * as Messages arrive and are consumed.
+ *
+ * @return the current number of Messages in the Queue.
+ */
+ int size();
+
+ /**
+ * Clears the Queue of any Messages.
+ */
+ void clear();
+
+ /**
+ * Removes and returns all Messages in the Queue.
+ *
+ * @return a List containing all Messages removed from the Queue.
+ */
+ List<JmsInboundMessageDispatch> removeAll();
+
+ /**
+ * @return the lock object used to protect against concurrent access.
+ */
+ Object getLock();
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PriorityMessageQueue.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PriorityMessageQueue.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PriorityMessageQueue.java
new file mode 100644
index 0000000..e0c7357
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PriorityMessageQueue.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.util;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+
+/**
+ * Simple Message Priority ordered Queue. Message envelopes are stored in the
+ * Queue based on their priority value.
+ */
+public final class PriorityMessageQueue extends AbstractMessageQueue {
+
+ private static final Integer MAX_PRIORITY = 10;
+
+ private final LinkedList<JmsInboundMessageDispatch>[] lists;
+ private int size = 0;
+
+ @SuppressWarnings("unchecked")
+ public PriorityMessageQueue() {
+ this.lists = new LinkedList[MAX_PRIORITY];
+ for (int i = 0; i < MAX_PRIORITY; i++) {
+ lists[i] = new LinkedList<JmsInboundMessageDispatch>();
+ }
+ }
+
+ @Override
+ public void enqueue(JmsInboundMessageDispatch envelope) {
+ synchronized (lock) {
+ getList(envelope).addLast(envelope);
+ this.size++;
+ lock.notify();
+ }
+ }
+
+ @Override
+ public void enqueueFirst(JmsInboundMessageDispatch envelope) {
+ synchronized (lock) {
+ getList(envelope).addFirst(envelope);
+ this.size++;
+ lock.notify();
+ }
+ }
+
+ @Override
+ public boolean isEmpty() {
+ synchronized (lock) {
+ return size == 0;
+ }
+ }
+
+ @Override
+ public int size() {
+ synchronized (lock) {
+ return size;
+ }
+ }
+
+ @Override
+ public void clear() {
+ synchronized (lock) {
+ for (int i = 0; i < MAX_PRIORITY; i++) {
+ lists[i].clear();
+ }
+ this.size = 0;
+ }
+ }
+
+ @Override
+ public List<JmsInboundMessageDispatch> removeAll() {
+ synchronized (lock) {
+ ArrayList<JmsInboundMessageDispatch> result = new ArrayList<JmsInboundMessageDispatch>(size());
+ for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
+ List<JmsInboundMessageDispatch> list = lists[i];
+ result.addAll(list);
+ size -= list.size();
+ list.clear();
+ }
+ return result;
+ }
+ }
+
+ @Override
+ protected JmsInboundMessageDispatch removeFirst() {
+ if (this.size > 0) {
+ for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
+ LinkedList<JmsInboundMessageDispatch> list = lists[i];
+ if (!list.isEmpty()) {
+ this.size--;
+ return list.removeFirst();
+ }
+ }
+ }
+ return null;
+ }
+
+ @Override
+ protected JmsInboundMessageDispatch peekFirst() {
+ if (this.size > 0) {
+ for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
+ LinkedList<JmsInboundMessageDispatch> list = lists[i];
+ if (!list.isEmpty()) {
+ return list.peekFirst();
+ }
+ }
+ }
+ return null;
+ }
+
+ private int getPriority(JmsInboundMessageDispatch envelope) {
+ int priority = javax.jms.Message.DEFAULT_PRIORITY;
+ if (envelope.getMessage() != null) {
+ try {
+ priority = Math.max(envelope.getMessage().getJMSPriority(), 0);
+ } catch (JMSException e) {
+ }
+ priority = Math.min(priority, 9);
+ }
+ return priority;
+ }
+
+ private LinkedList<JmsInboundMessageDispatch> getList(JmsInboundMessageDispatch envelope) {
+ return lists[getPriority(envelope)];
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PropertyUtil.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PropertyUtil.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PropertyUtil.java
new file mode 100644
index 0000000..8eb61d2
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PropertyUtil.java
@@ -0,0 +1,486 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.util;
+
+import java.beans.BeanInfo;
+import java.beans.Introspector;
+import java.beans.PropertyDescriptor;
+import java.beans.PropertyEditor;
+import java.beans.PropertyEditorManager;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import javax.net.ssl.SSLContext;
+
+/**
+ * Utilities for properties
+ */
+public class PropertyUtil {
+
+ /**
+ * Creates a URI from the original URI and the given parameters.
+ *
+ * @param originalURI
+ * The URI whose current parameters are remove and replaced with the given remainder
+ * value.
+ * @param params
+ * The URI params that should be used to replace the current ones in the target.
+ *
+ * @return a new URI that matches the original one but has its query options replaced with
+ * the given ones.
+ *
+ * @throws URISyntaxException
+ */
+ public static URI replaceQuery(URI originalURI, Map<String, String> params) throws URISyntaxException {
+ String s = createQueryString(params);
+ if (s.length() == 0) {
+ s = null;
+ }
+ return replaceQuery(originalURI, s);
+ }
+
+ /**
+ * Creates a URI with the given query, removing an previous query value from the given URI.
+ *
+ * @param uri
+ * The source URI whose existing query is replaced with the newly supplied one.
+ * @param query
+ * The new URI query string that should be appended to the given URI.
+ *
+ * @return a new URI that is a combination of the original URI and the given query string.
+ * @throws URISyntaxException
+ */
+ public static URI replaceQuery(URI uri, String query) throws URISyntaxException {
+ String schemeSpecificPart = uri.getRawSchemeSpecificPart();
+ // strip existing query if any
+ int questionMark = schemeSpecificPart.lastIndexOf("?");
+ // make sure question mark is not within parentheses
+ if (questionMark < schemeSpecificPart.lastIndexOf(")")) {
+ questionMark = -1;
+ }
+ if (questionMark > 0) {
+ schemeSpecificPart = schemeSpecificPart.substring(0, questionMark);
+ }
+ if (query != null && query.length() > 0) {
+ schemeSpecificPart += "?" + query;
+ }
+ return new URI(uri.getScheme(), schemeSpecificPart, uri.getFragment());
+ }
+
+ /**
+ * Creates a URI with the given query, removing an previous query value from the given URI.
+ *
+ * @param uri
+ * The source URI whose existing query is replaced with the newly supplied one.
+ * @param query
+ * The new URI query string that should be appended to the given URI.
+ *
+ * @return a new URI that is a combination of the original URI and the given query string.
+ * @throws URISyntaxException
+ */
+ public static URI eraseQuery(URI uri) throws URISyntaxException {
+ return replaceQuery(uri, (String) null);
+ }
+
+ /**
+ * Given a key / value mapping, create and return a URI formatted query string that is valid
+ * and can be appended to a URI.
+ *
+ * @param options
+ * The Mapping that will create the new Query string.
+ *
+ * @return a URI formatted query string.
+ *
+ * @throws URISyntaxException
+ */
+ public static String createQueryString(Map<String, ? extends Object> options) throws URISyntaxException {
+ try {
+ if (options.size() > 0) {
+ StringBuffer rc = new StringBuffer();
+ boolean first = true;
+ for (String key : options.keySet()) {
+ if (first) {
+ first = false;
+ } else {
+ rc.append("&");
+ }
+ String value = (String) options.get(key);
+ rc.append(URLEncoder.encode(key, "UTF-8"));
+ rc.append("=");
+ rc.append(URLEncoder.encode(value, "UTF-8"));
+ }
+ return rc.toString();
+ } else {
+ return "";
+ }
+ } catch (UnsupportedEncodingException e) {
+ throw (URISyntaxException) new URISyntaxException(e.toString(), "Invalid encoding").initCause(e);
+ }
+ }
+
+ /**
+ * Get properties from a URI
+ *
+ * @param uri
+ * @return <Code>Map</Code> of properties
+ * @throws Exception
+ */
+ @SuppressWarnings("unchecked")
+ public static Map<String, String> parseParameters(URI uri) throws Exception {
+ return uri.getQuery() == null ? Collections.EMPTY_MAP : parseQuery(stripPrefix(uri.getQuery(), "?"));
+ }
+
+ /**
+ * Parse properties from a named resource -eg. a URI or a simple name e.g.
+ * foo?name="fred"&size=2
+ *
+ * @param uri
+ * @return <Code>Map</Code> of properties
+ * @throws Exception
+ */
+ @SuppressWarnings("unchecked")
+ public static Map<String, String> parseParameters(String uri) throws Exception {
+ return uri == null ? Collections.EMPTY_MAP : parseQuery(stripUpto(uri, '?'));
+ }
+
+ /**
+ * Get properties from a uri
+ *
+ * @param uri
+ * @return <Code>Map</Code> of properties
+ *
+ * @throws Exception
+ */
+ public static Map<String, String> parseQuery(String uri) throws Exception {
+ if (uri != null) {
+ Map<String, String> rc = new HashMap<String, String>();
+ if (uri != null) {
+ String[] parameters = uri.split("&");
+ for (int i = 0; i < parameters.length; i++) {
+ int p = parameters[i].indexOf("=");
+ if (p >= 0) {
+ String name = URLDecoder.decode(parameters[i].substring(0, p), "UTF-8");
+ String value = URLDecoder.decode(parameters[i].substring(p + 1), "UTF-8");
+ rc.put(name, value);
+ } else {
+ rc.put(parameters[i], null);
+ }
+ }
+ }
+ return rc;
+ }
+ return Collections.emptyMap();
+ }
+
+ /**
+ * Given a map of properties, filter out only those prefixed with the given value, the
+ * values filtered are returned in a new Map instance.
+ *
+ * @param properties
+ * The map of properties to filter.
+ * @param optionPrefix
+ * The prefix value to use when filtering.
+ *
+ * @return a filter map with only values that match the given prefix.
+ */
+ public static Map<String, String> filterProperties(Map<String, String> props, String optionPrefix) {
+ if (props == null) {
+ throw new IllegalArgumentException("props was null.");
+ }
+
+ HashMap<String, String> rc = new HashMap<String, String>(props.size());
+
+ for (Iterator<String> iter = props.keySet().iterator(); iter.hasNext();) {
+ String name = iter.next();
+ if (name.startsWith(optionPrefix)) {
+ String value = props.get(name);
+ name = name.substring(optionPrefix.length());
+ rc.put(name, value);
+ iter.remove();
+ }
+ }
+
+ return rc;
+ }
+
+ /**
+ * Add bean properties to a URI
+ *
+ * @param uri
+ * @param bean
+ * @return <Code>Map</Code> of properties
+ * @throws Exception
+ */
+ public static String addPropertiesToURIFromBean(String uri, Object bean) throws Exception {
+ Map<String, String> props = PropertyUtil.getProperties(bean);
+ return PropertyUtil.addPropertiesToURI(uri, props);
+ }
+
+ /**
+ * Add properties to a URI
+ *
+ * @param uri
+ * @param props
+ * @return uri with properties on
+ * @throws Exception
+ */
+ public static String addPropertiesToURI(URI uri, Map<String, String> props) throws Exception {
+ return addPropertiesToURI(uri.toString(), props);
+ }
+
+ /**
+ * Add properties to a URI
+ *
+ * @param uri
+ * @param props
+ * @return uri with properties on
+ * @throws Exception
+ */
+ public static String addPropertiesToURI(String uri, Map<String, String> props) throws Exception {
+ String result = uri;
+ if (uri != null && props != null) {
+ StringBuilder base = new StringBuilder(stripBefore(uri, '?'));
+ Map<String, String> map = parseParameters(uri);
+ if (!map.isEmpty()) {
+ map.putAll(props);
+ }
+ if (!map.isEmpty()) {
+ base.append('?');
+ boolean first = true;
+ for (Map.Entry<String, String> entry : map.entrySet()) {
+ if (!first) {
+ base.append('&');
+ }
+ first = false;
+ base.append(entry.getKey()).append("=").append(entry.getValue());
+ }
+ result = base.toString();
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Set properties on an object using the provided map. The return value indicates if all
+ * properties from the given map were set on the target object.
+ *
+ * @param target
+ * the object whose properties are to be set from the map options.
+ * @param props
+ * the properties that should be applied to the given object.
+ *
+ * @return true if all values in the props map were applied to the target object.
+ */
+ public static boolean setProperties(Object target, Map<String, String> props) {
+ if (target == null) {
+ throw new IllegalArgumentException("target was null.");
+ }
+ if (props == null) {
+ throw new IllegalArgumentException("props was null.");
+ }
+
+ int setCounter = 0;
+
+ for (Map.Entry<String, String> entry : props.entrySet()) {
+ if (setProperty(target, entry.getKey(), entry.getValue())) {
+ setCounter++;
+ }
+ }
+
+ return setCounter == props.size();
+ }
+
+ /**
+ * Get properties from an object
+ *
+ * @param object
+ * @return <Code>Map</Code> of properties
+ * @throws Exception
+ */
+ public static Map<String, String> getProperties(Object object) throws Exception {
+ Map<String, String> props = new LinkedHashMap<String, String>();
+ BeanInfo beanInfo = Introspector.getBeanInfo(object.getClass());
+ Object[] NULL_ARG = {};
+ PropertyDescriptor[] propertyDescriptors = beanInfo.getPropertyDescriptors();
+ if (propertyDescriptors != null) {
+ for (int i = 0; i < propertyDescriptors.length; i++) {
+ PropertyDescriptor pd = propertyDescriptors[i];
+ if (pd.getReadMethod() != null && !pd.getName().equals("class") && !pd.getName().equals("properties") && !pd.getName().equals("reference")) {
+ Object value = pd.getReadMethod().invoke(object, NULL_ARG);
+ if (value != null) {
+ if (value instanceof Boolean || value instanceof Number || value instanceof String || value instanceof URI || value instanceof URL) {
+ props.put(pd.getName(), ("" + value));
+ } else if (value instanceof SSLContext) {
+ // ignore this one..
+ } else {
+ Map<String, String> inner = getProperties(value);
+ for (Map.Entry<String, String> entry : inner.entrySet()) {
+ props.put(pd.getName() + "." + entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ }
+ }
+ }
+ return props;
+ }
+
+ /**
+ * Find a specific property getter in a given object based on a property name.
+ *
+ * @param object
+ * the object to search.
+ * @param name
+ * the property name to search for.
+ *
+ * @return the result of invoking the specific property get method.
+ *
+ * @throws Exception if an error occurs while searching the object's bean info.
+ */
+ public static Object getProperty(Object object, String name) throws Exception {
+ BeanInfo beanInfo = Introspector.getBeanInfo(object.getClass());
+ PropertyDescriptor[] propertyDescriptors = beanInfo.getPropertyDescriptors();
+ if (propertyDescriptors != null) {
+ for (int i = 0; i < propertyDescriptors.length; i++) {
+ PropertyDescriptor pd = propertyDescriptors[i];
+ if (pd.getReadMethod() != null && pd.getName().equals(name)) {
+ return pd.getReadMethod().invoke(object);
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Set a property
+ *
+ * @param target
+ * @param name
+ * @param value
+ * @return true if set
+ */
+ public static boolean setProperty(Object target, String name, Object value) {
+ try {
+ int dotPos = name.indexOf(".");
+ while (dotPos >= 0) {
+ String getterName = name.substring(0, dotPos);
+ target = getProperty(target, getterName);
+ name = name.substring(dotPos + 1);
+ dotPos = name.indexOf(".");
+ }
+
+ Class<? extends Object> clazz = target.getClass();
+ Method setter = findSetterMethod(clazz, name);
+ if (setter == null) {
+ return false;
+ }
+ // If the type is null or it matches the needed type, just use the
+ // value directly
+ if (value == null || value.getClass() == setter.getParameterTypes()[0]) {
+ setter.invoke(target, new Object[] { value });
+ } else {
+ // We need to convert it
+ setter.invoke(target, new Object[] { convert(value, setter.getParameterTypes()[0]) });
+ }
+ return true;
+ } catch (Throwable ignore) {
+ return false;
+ }
+ }
+
+ /**
+ * Return a String past a prefix
+ *
+ * @param value
+ * @param prefix
+ * @return stripped
+ */
+ public static String stripPrefix(String value, String prefix) {
+ if (value.startsWith(prefix)) {
+ return value.substring(prefix.length());
+ }
+ return value;
+ }
+
+ /**
+ * Return a String from to a character
+ *
+ * @param value
+ * @param c
+ * @return stripped
+ */
+ public static String stripUpto(String value, char c) {
+ String result = null;
+ int index = value.indexOf(c);
+ if (index > 0) {
+ result = value.substring(index + 1);
+ }
+ return result;
+ }
+
+ /**
+ * Return a String up to and including character
+ *
+ * @param value
+ * @param c
+ * @return stripped
+ */
+ public static String stripBefore(String value, char c) {
+ String result = value;
+ int index = value.indexOf(c);
+ if (index > 0) {
+ result = value.substring(0, index);
+ }
+ return result;
+ }
+
+ private static Method findSetterMethod(Class<? extends Object> clazz, String name) {
+ // Build the method name.
+ name = "set" + name.substring(0, 1).toUpperCase() + name.substring(1);
+ Method[] methods = clazz.getMethods();
+ for (int i = 0; i < methods.length; i++) {
+ Method method = methods[i];
+ Class<? extends Object> params[] = method.getParameterTypes();
+ if (method.getName().equals(name) && params.length == 1) {
+ return method;
+ }
+ }
+ return null;
+ }
+
+ private static Object convert(Object value, Class<?> type) throws Exception {
+ PropertyEditor editor = PropertyEditorManager.findEditor(type);
+ if (editor != null) {
+ editor.setAsText(value.toString());
+ return editor.getValue();
+ }
+ if (type == URI.class) {
+ return new URI(value.toString());
+ }
+ return null;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org