You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2006/09/20 00:07:25 UTC

svn commit: r447994 [18/46] - in /incubator/qpid/trunk/qpid: ./ cpp/ cpp/bin/ cpp/broker/ cpp/broker/inc/ cpp/broker/src/ cpp/broker/test/ cpp/client/ cpp/client/inc/ cpp/client/src/ cpp/client/test/ cpp/common/ cpp/common/concurrent/ cpp/common/concur...

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQBrokerDetails.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQBrokerDetails.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQBrokerDetails.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQBrokerDetails.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,301 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.url.URLHelper;
+import org.apache.qpid.url.URLSyntaxException;
+
+import java.util.HashMap;
+import java.net.URISyntaxException;
+import java.net.URI;
+
+public class AMQBrokerDetails implements BrokerDetails
+{
+    private String _host;
+    private int _port;
+    private String _transport;
+
+    private HashMap<String, String> _options;
+
+    public AMQBrokerDetails()
+    {
+        _options = new HashMap<String, String>();
+    }
+
+    public AMQBrokerDetails(String url) throws URLSyntaxException
+    {
+        this();
+        // URL should be of format tcp://host:port?option='value',option='value'
+        try
+        {
+            URI connection = new URI(url);
+
+            String transport = connection.getScheme();
+
+            // Handles some defaults to minimise changes to existing broker URLS e.g. localhost
+            if (transport != null)
+            {
+                //todo this list of valid transports should be enumerated somewhere
+                if ((!(transport.equalsIgnoreCase("vm") ||
+                        transport.equalsIgnoreCase("tcp"))))
+                {
+                    if (transport.equalsIgnoreCase("localhost"))
+                    {
+                        connection = new URI(DEFAULT_TRANSPORT + "://" + url);
+                        transport = connection.getScheme();
+                    }
+                    else
+                    {
+                        if (url.charAt(transport.length()) == ':' && url.charAt(transport.length()+1) != '/' )
+                        {
+                            //Then most likely we have a host:port value
+                            connection = new URI(DEFAULT_TRANSPORT + "://" + url);
+                            transport = connection.getScheme();
+                        }
+                        else
+                        {
+                            URLHelper.parseError(0, transport.length(), "Unknown transport", url);
+                        }
+                    }
+                }
+            }
+            else
+            {
+                //Default the transport
+                connection = new URI(DEFAULT_TRANSPORT + "://" + url);
+                transport = connection.getScheme();
+            }
+
+            if (transport == null)
+            {
+                URLHelper.parseError(-1, "Unknown transport:'" + transport + "'" +
+                        " In broker URL:'" + url + "' Format: " + URL_FORMAT_EXAMPLE, "");
+            }
+
+            setTransport(transport);
+
+            String host = connection.getHost();
+
+            // Fix for Java 1.5
+            if (host == null)
+            {
+                host = "";
+            }
+
+            setHost(host);
+
+            int port = connection.getPort();
+
+            if (port == -1)
+            {
+                // Another fix for Java 1.5 URI handling
+                String auth = connection.getAuthority();
+
+                if (auth != null && auth.startsWith(":"))
+                {
+                    setPort(Integer.parseInt(auth.substring(1)));
+                }
+                else
+                {
+                    setPort(DEFAULT_PORT);
+                }
+            }
+            else
+            {
+                setPort(port);
+            }
+
+            String queryString = connection.getQuery();
+
+            URLHelper.parseOptions(_options, queryString);
+
+            //Fragment is #string (not used)
+        }
+        catch (URISyntaxException uris)
+        {
+            if (uris instanceof URLSyntaxException)
+            {
+                throw (URLSyntaxException) uris;
+            }
+
+            URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
+        }
+    }
+
+    public AMQBrokerDetails(String host, int port, boolean useSSL)
+    {
+        _host = host;
+        _port = port;
+
+        if (useSSL)
+        {
+            setOption(OPTIONS_SSL, "true");
+        }
+    }
+
+    public String getHost()
+    {
+        return _host;
+    }
+
+    public void setHost(String _host)
+    {
+        this._host = _host;
+    }
+
+    public int getPort()
+    {
+        return _port;
+    }
+
+    public void setPort(int _port)
+    {
+        this._port = _port;
+    }
+
+    public String getTransport()
+    {
+        return _transport;
+    }
+
+    public void setTransport(String _transport)
+    {
+        this._transport = _transport;
+    }
+
+
+    public String getOption(String key)
+    {
+        return _options.get(key);
+    }
+
+    public void setOption(String key, String value)
+    {
+        _options.put(key, value);
+    }
+
+    public long getTimeout()
+    {
+        if (_options.containsKey(OPTIONS_CONNECT_TIMEOUT))
+        {
+            try
+            {
+                return Long.parseLong(_options.get(OPTIONS_CONNECT_TIMEOUT));
+            }
+            catch (NumberFormatException nfe)
+            {
+                //Do nothing as we will use the default below.
+            }
+        }
+
+        return BrokerDetails.DEFAULT_CONNECT_TIMEOUT;
+    }
+
+    public void setTimeout(long timeout)
+    {
+        setOption(OPTIONS_CONNECT_TIMEOUT, Long.toString(timeout));
+    }
+
+    public String toString()
+    {
+        StringBuffer sb = new StringBuffer();
+
+        sb.append(_transport);
+        sb.append("://");
+
+        if (!(_transport.equalsIgnoreCase("vm")))
+        {
+            sb.append(_host);
+        }
+
+        sb.append(':');
+        sb.append(_port);
+
+        sb.append(printOptionsURL());
+
+        return sb.toString();
+    }
+
+    public boolean equals(Object o)
+    {
+        if (!(o instanceof BrokerDetails))
+        {
+            return false;
+        }
+
+        BrokerDetails bd = (BrokerDetails) o;
+
+        return _host.equalsIgnoreCase(bd.getHost()) &&
+                (_port == bd.getPort()) &&
+                _transport.equalsIgnoreCase(bd.getTransport()) &&
+                (useSSL() == bd.useSSL());
+
+        //todo do we need to compare all the options as well?
+    }
+
+    private String printOptionsURL()
+    {
+        StringBuffer optionsURL = new StringBuffer();
+
+        optionsURL.append('?');
+
+        if (!(_options.isEmpty()))
+        {
+
+            for (String key : _options.keySet())
+            {
+                optionsURL.append(key);
+
+                optionsURL.append("='");
+
+                optionsURL.append(_options.get(key));
+
+                optionsURL.append("'");
+
+                optionsURL.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+            }
+        }
+
+        //remove the extra DEFAULT_OPTION_SEPERATOR or the '?' if there are no options
+        optionsURL.deleteCharAt(optionsURL.length() - 1);
+
+        return optionsURL.toString();
+    }
+
+    public boolean useSSL()
+    {
+        // To be friendly to users we should be case insensitive.
+        // or simply force users to conform to OPTIONS_SSL
+        // todo make case insensitive by trying ssl Ssl sSl ssL SSl SsL sSL SSL
+
+        if (_options.containsKey(OPTIONS_SSL))
+        {
+            return _options.get(OPTIONS_SSL).equalsIgnoreCase("true");
+        }
+
+        return false;
+    }
+
+    public void useSSL(boolean ssl)
+    {
+        setOption(OPTIONS_SSL, Boolean.toString(ssl));
+    }
+
+
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQBrokerDetails.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnection.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnection.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnection.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,927 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQUndeliveredException;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQUnresolvedAddressException;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.failover.FailoverSupport;
+import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.jms.*;
+import org.apache.qpid.jms.Connection;
+
+import org.apache.log4j.Logger;
+
+import javax.jms.*;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.naming.Reference;
+import javax.naming.NamingException;
+import javax.naming.StringRefAddr;
+import javax.naming.Referenceable;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.net.ConnectException;
+import java.nio.channels.UnresolvedAddressException;
+import java.text.MessageFormat;
+
+public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
+{
+    private static final Logger _logger = Logger.getLogger(AMQConnection.class);
+
+    private AtomicInteger _idFactory = new AtomicInteger(0);
+
+    /**
+     * This is the "root" mutex that must be held when doing anything that could be impacted by failover.
+     * This must be held by any child objects of this connection such as the session, producers and consumers.
+     */
+    private final Object _failoverMutex = new Object();
+
+    /**
+     * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels
+     * per session and we must prevent the client from opening too many. Zero means unlimited.
+     */
+    private long _maximumChannelCount;
+
+    /**
+     * The maximum size of frame supported by the server
+     */
+    private long _maximumFrameSize;
+
+    /**
+     * The protocol handler dispatches protocol events for this connection. For example, when the connection is dropped
+     * the handler deals with this. It also deals with the initial dispatch of any protocol frames to their appropriate
+     * handler.
+     */
+    private AMQProtocolHandler _protocolHandler;
+
+    /**
+     * Maps from session id (Integer) to AMQSession instance
+     */
+    private final Map _sessions = new LinkedHashMap(); //fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap
+
+    private String _clientName;
+
+    /**
+     * The user name to use for authentication
+     */
+    private String _username;
+
+    /**
+     * The password to use for authentication
+     */
+    private String _password;
+
+    /**
+     * The virtual path to connect to on the AMQ server
+     */
+    private String _virtualHost;
+
+    private ExceptionListener _exceptionListener;
+
+    private ConnectionListener _connectionListener;
+
+    private ConnectionURL _connectionURL;
+
+    /**
+     * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for
+     * message publication.
+     */
+    private boolean _started;
+
+    /**
+     * Policy dictating how to failover
+     */
+    private FailoverPolicy _failoverPolicy;
+
+    /*
+     * _Connected should be refactored with a suitable wait object.
+    */
+    private boolean _connected;
+
+    /*
+     * The last error code that occured on the connection. Used to return the correct exception to the client
+    */
+    private AMQException _lastAMQException = null;
+
+    public AMQConnection(String broker, String username, String password,
+                         String clientName, String virtualHost) throws AMQException, URLSyntaxException
+    {
+        this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" +
+                username + ":" + password + "@" + clientName +
+                virtualHost + "?brokerlist='" + broker + "'"));
+    }
+
+    public AMQConnection(String host, int port, String username, String password,
+                         String clientName, String virtualHost) throws AMQException, URLSyntaxException
+    {
+        this(host, port, false, username, password, clientName, virtualHost);
+    }
+
+    public AMQConnection(String host, int port, boolean useSSL, String username, String password,
+                         String clientName, String virtualHost) throws AMQException, URLSyntaxException
+    {
+        this(new AMQConnectionURL(useSSL ?
+                ConnectionURL.AMQ_PROTOCOL + "://" +
+                        username + ":" + password + "@" + clientName +
+                        virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
+                        + "," + ConnectionURL.OPTIONS_SSL + "='true'" :
+                ConnectionURL.AMQ_PROTOCOL + "://" +
+                        username + ":" + password + "@" + clientName +
+                        virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
+                        + "," + ConnectionURL.OPTIONS_SSL + "='false'"
+        ));
+    }
+
+    public AMQConnection(String connection) throws AMQException, URLSyntaxException
+    {
+        this(new AMQConnectionURL(connection));
+    }
+
+    public AMQConnection(ConnectionURL connectionURL) throws AMQException
+    {
+        _logger.info("Connection:" + connectionURL);
+
+        if (connectionURL == null)
+        {
+            throw new IllegalArgumentException("Connection must be specified");
+        }
+
+        _connectionURL = connectionURL;
+
+        _clientName = connectionURL.getClientName();
+        _username = connectionURL.getUsername();
+        _password = connectionURL.getPassword();
+        _virtualHost = connectionURL.getVirtualHost();
+
+        _failoverPolicy = new FailoverPolicy(connectionURL);
+
+        _protocolHandler = new AMQProtocolHandler(this);
+
+        // We are not currently connected
+        _connected = false;
+
+
+        Exception lastException = new Exception();
+        lastException.initCause(new ConnectException());
+
+        while (lastException != null && lastException.getCause() instanceof ConnectException && _failoverPolicy.failoverAllowed())
+        {
+            try
+            {
+                makeBrokerConnection(_failoverPolicy.getNextBrokerDetails());
+                lastException = null;
+            }
+            catch (Exception e)
+            {
+                lastException = e;
+
+                _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e.getCause());
+                _logger.info(e);
+                _logger.info(e.getCause());
+            }
+        }
+
+        _logger.debug("Are we connected:" + _connected);
+
+        // Then the Failover Thread will handle conneciton
+        if (_failoverPolicy.failoverAllowed())
+        {
+            //TODO this needs to be redone so that we are not spinning.
+            // A suitable object should be set that is then waited on
+            // and only notified when a connection is made or when
+            // the AMQConnection gets closed.
+            while (!_connected && !_closed.get())
+            {
+                try
+                {
+                    _logger.debug("Sleeping.");
+                    Thread.sleep(100);
+                }
+                catch (InterruptedException ie)
+                {
+                    _logger.debug("Woken up.");
+                }
+            }
+            if (!_failoverPolicy.failoverAllowed() || _failoverPolicy.getCurrentBrokerDetails() == null)
+            {
+                if (_lastAMQException != null)
+                {
+                    throw _lastAMQException;
+                }
+            }
+        }
+        else
+        {
+            String message = null;
+
+            if (lastException != null)
+            {
+                if (lastException.getCause() != null)
+                {
+                    message = lastException.getCause().getMessage();
+                }
+                else
+                {
+                    message = lastException.getMessage();
+                }
+            }
+
+            if (message == null || message.equals(""))
+            {
+                message = "Unable to Connect";
+            }
+
+            AMQException e = new AMQConnectionException(message);
+
+            if (lastException != null)
+            {
+                if (lastException instanceof UnresolvedAddressException)
+                {
+                    e = new AMQUnresolvedAddressException(message);
+                }
+                e.initCause(lastException);
+            }
+
+            throw e;
+        }
+    }
+
+    protected AMQConnection(String username, String password, String clientName, String virtualHost)
+    {
+        _clientName = clientName;
+        _username = username;
+        _password = password;
+        _virtualHost = virtualHost;
+    }
+
+    private void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
+    {
+        try
+        {
+            TransportConnection.getInstance().connect(_protocolHandler, brokerDetail);
+            // this blocks until the connection has been set up or when an error
+            // has prevented the connection being set up
+            _protocolHandler.attainState(AMQState.CONNECTION_OPEN);
+            _failoverPolicy.attainedConnection();
+
+            //Again this should be changed to a suitable notify
+            _connected = true;
+        }
+        catch (AMQException e)
+        {
+            _lastAMQException = e;
+            throw e;
+        }
+    }
+
+    public boolean attemptReconnection(String host, int port, boolean useSSL)
+    {
+        BrokerDetails bd = new AMQBrokerDetails(host, port, useSSL);
+
+        _failoverPolicy.setBroker(bd);
+
+        try
+        {
+            makeBrokerConnection(bd);
+            return true;
+        }
+        catch (Exception e)
+        {
+            _logger.info("Unable to connect to broker at " + bd);
+            attemptReconnection();
+        }
+        return false;
+    }
+
+    public boolean attemptReconnection()
+    {
+        while (_failoverPolicy.failoverAllowed())
+        {
+            try
+            {
+                makeBrokerConnection(_failoverPolicy.getNextBrokerDetails());
+                return true;
+            }
+            catch (Exception e)
+            {
+                if (!(e instanceof AMQException))
+                {
+                    _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e);
+                }
+                else
+                {
+                    _logger.info(e.getMessage() + ":Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails());
+                }
+            }
+        }
+
+        //connection unsuccessful
+        return false;
+    }
+
+    /**
+     * Get the details of the currently active broker
+     *
+     * @return null if no broker is active (i.e. no successful connection has been made, or
+     *         the BrokerDetail instance otherwise
+     */
+    public BrokerDetails getActiveBrokerDetails()
+    {
+        return _failoverPolicy.getCurrentBrokerDetails();
+    }
+
+    public boolean failoverAllowed()
+    {
+        return _failoverPolicy.failoverAllowed();
+    }
+
+    public Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException
+    {
+        return createSession(transacted, acknowledgeMode, AMQSession.DEFAULT_PREFETCH);
+    }
+
+    public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
+                                                     final int prefetch) throws JMSException
+    {
+        checkNotClosed();
+        if (channelLimitReached())
+        {
+            throw new ChannelLimitReachedException(_maximumChannelCount);
+        }
+        else
+        {
+            return (org.apache.qpid.jms.Session) new FailoverSupport()
+            {
+                public Object operation() throws JMSException
+                {
+                    int channelId = _idFactory.incrementAndGet();
+
+                    if (_logger.isDebugEnabled())
+                    {
+                        _logger.debug("Write channel open frame for channel id " + channelId);
+                    }
+
+                    // We must create the session and register it before actually sending the frame to the server to
+                    // open it, so that there is no window where we could receive data on the channel and not be set
+                    // up to handle it appropriately.
+                    AMQSession session = new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode,
+                            prefetch);
+                    _protocolHandler.addSessionByChannel(channelId, session);
+                    registerSession(channelId, session);
+
+                    boolean success = false;
+                    try
+                    {
+                        createChannelOverWire(channelId, prefetch, transacted);
+                        success = true;
+                    }
+                    catch (AMQException e)
+                    {
+                        JMSException jmse = new JMSException("Error creating session: " + e);
+                        jmse.setLinkedException(e);
+                        throw jmse;
+                    }
+                    finally
+                    {
+                        if (!success) {
+                            _protocolHandler.removeSessionByChannel(channelId);
+                            deregisterSession(channelId);
+                        }
+                    }
+
+                    if (_started)
+                    {
+                        session.start();
+                    }
+                    return session;
+                }
+            }.execute(this);
+        }
+    }
+
+    private void createChannelOverWire(int channelId, int prefetch, boolean transacted)
+            throws AMQException
+    {
+        _protocolHandler.syncWrite(
+                ChannelOpenBody.createAMQFrame(channelId, null), ChannelOpenOkBody.class);
+        _protocolHandler.syncWrite(
+                BasicQosBody.createAMQFrame(channelId, 0, prefetch, false),
+                BasicQosOkBody.class);
+
+        if (transacted)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Issuing TxSelect for " + channelId);
+            }
+            _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId), TxSelectOkBody.class);
+        }
+    }
+
+    private void reopenChannel(int channelId, int prefetch, boolean transacted) throws AMQException
+    {
+        try
+        {
+            createChannelOverWire(channelId, prefetch, transacted);
+        }
+        catch (AMQException e)
+        {
+            _protocolHandler.removeSessionByChannel(channelId);
+            deregisterSession(channelId);
+            throw new AMQException("Error reopening channel " + channelId + " after failover: " + e);
+        }
+    }
+
+
+    public void setFailoverPolicy(FailoverPolicy policy)
+    {
+        _failoverPolicy = policy;
+    }
+
+    public FailoverPolicy getFailoverPolicy()
+    {
+        return _failoverPolicy;
+    }
+
+    public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException
+    {
+        return (QueueSession) createSession(transacted, acknowledgeMode);
+    }
+
+    public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException
+    {
+        return (TopicSession) createSession(transacted, acknowledgeMode);
+    }
+
+    private boolean channelLimitReached()
+    {
+        return _maximumChannelCount != 0 && _sessions.size() == _maximumChannelCount;
+    }
+
+    public String getClientID() throws JMSException
+    {
+        checkNotClosed();
+        return _clientName;
+    }
+
+    public void setClientID(String clientID) throws JMSException
+    {
+        checkNotClosed();
+        _clientName = clientID;
+    }
+
+    public ConnectionMetaData getMetaData() throws JMSException
+    {
+        checkNotClosed();
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    public ExceptionListener getExceptionListener() throws JMSException
+    {
+        checkNotClosed();
+        return _exceptionListener;
+    }
+
+    public void setExceptionListener(ExceptionListener listener) throws JMSException
+    {
+        checkNotClosed();
+        _exceptionListener = listener;
+    }
+
+    /**
+     * Start the connection, i.e. start flowing messages. Note that this method must be called only from a single thread
+     * and is not thread safe (which is legal according to the JMS specification).
+     *
+     * @throws JMSException
+     */
+    public void start() throws JMSException
+    {
+        checkNotClosed();
+        if (!_started)
+        {
+            final Iterator it = _sessions.entrySet().iterator();
+            while (it.hasNext())
+            {
+                final AMQSession s = (AMQSession) ((Map.Entry) it.next()).getValue();
+                s.start();
+            }
+            _started = true;
+        }
+    }
+
+    public void stop() throws JMSException
+    {
+        checkNotClosed();
+
+        if (_started)
+        {
+            for (Iterator i = _sessions.values().iterator(); i.hasNext();)
+            {
+                ((AMQSession) i.next()).stop();
+            }
+            _started = false;
+        }
+    }
+
+    public void close() throws JMSException
+    {
+        synchronized (getFailoverMutex())
+        {
+            if (!_closed.getAndSet(true))
+            {
+                try
+                {
+                    closeAllSessions(null);
+                    _protocolHandler.closeConnection();
+                }
+                catch (AMQException e)
+                {
+                    throw new JMSException("Error closing connection: " + e);
+                }
+            }
+        }
+    }
+
+    /**
+     * Marks all sessions and their children as closed without sending any protocol messages. Useful when
+     * you need to mark objects "visible" in userland as closed after failover or other significant event that
+     * impacts the connection.
+     * <p/>
+     * The caller must hold the failover mutex before calling this method.
+     */
+    private void markAllSessionsClosed()
+    {
+        final LinkedList sessionCopy = new LinkedList(_sessions.values());
+        final Iterator it = sessionCopy.iterator();
+        while (it.hasNext())
+        {
+            final AMQSession session = (AMQSession) it.next();
+
+            session.markClosed();
+        }
+        _sessions.clear();
+    }
+
+    /**
+     * Close all the sessions, either due to normal connection closure or due to an error occurring.
+     *
+     * @param cause if not null, the error that is causing this shutdown
+     *              <p/>
+     *              The caller must hold the failover mutex before calling this method.
+     */
+    private void closeAllSessions(Throwable cause) throws JMSException
+    {
+        final LinkedList sessionCopy = new LinkedList(_sessions.values());
+        final Iterator it = sessionCopy.iterator();
+        JMSException sessionException = null;
+        while (it.hasNext())
+        {
+            final AMQSession session = (AMQSession) it.next();
+            if (cause != null)
+            {
+                session.closed(cause);
+            }
+            else
+            {
+                try
+                {
+                    session.close();
+                }
+                catch (JMSException e)
+                {
+                    _logger.error("Error closing session: " + e);
+                    sessionException = e;
+                }
+            }
+        }
+        _sessions.clear();
+        if (sessionException != null)
+        {
+            throw sessionException;
+        }
+    }
+
+    public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
+                                                       ServerSessionPool sessionPool,
+                                                       int maxMessages) throws JMSException
+    {
+        checkNotClosed();
+        return null;
+    }
+
+    public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector,
+                                                       ServerSessionPool sessionPool,
+                                                       int maxMessages) throws JMSException
+    {
+        checkNotClosed();
+        return null;
+    }
+
+    public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector,
+                                                       ServerSessionPool sessionPool,
+                                                       int maxMessages) throws JMSException
+    {
+        checkNotClosed();
+        return null;
+    }
+
+    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName,
+                                                              String messageSelector, ServerSessionPool sessionPool,
+                                                              int maxMessages)
+            throws JMSException
+    {
+        // TODO Auto-generated method stub
+        checkNotClosed();
+        return null;
+    }
+
+    public long getMaximumChannelCount()
+    {
+        checkNotClosed();
+        return _maximumChannelCount;
+    }
+
+    public void setConnectionListener(ConnectionListener listener)
+    {
+        _connectionListener = listener;
+    }
+
+    public ConnectionListener getConnectionListener()
+    {
+        return _connectionListener;
+    }
+
+    public void setMaximumChannelCount(long maximumChannelCount)
+    {
+        checkNotClosed();
+        _maximumChannelCount = maximumChannelCount;
+    }
+
+    public void setMaximumFrameSize(long frameMax)
+    {
+        _maximumFrameSize = frameMax;
+    }
+
+    public long getMaximumFrameSize()
+    {
+        return _maximumFrameSize;
+    }
+
+    public Map getSessions()
+    {
+        return _sessions;
+    }
+
+    public String getUsername()
+    {
+        return _username;
+    }
+
+    public String getPassword()
+    {
+        return _password;
+    }
+
+    public String getVirtualHost()
+    {
+        return _virtualHost;
+    }
+
+    public AMQProtocolHandler getProtocolHandler()
+    {
+        return _protocolHandler;
+    }
+
+    public void bytesSent(long writtenBytes)
+    {
+        if (_connectionListener != null)
+        {
+            _connectionListener.bytesSent(writtenBytes);
+        }
+    }
+
+    public void bytesReceived(long receivedBytes)
+    {
+        if (_connectionListener != null)
+        {
+            _connectionListener.bytesReceived(receivedBytes);
+        }
+    }
+
+    /**
+     * Fire the preFailover event to the registered connection listener (if any)
+     *
+     * @param redirect true if this is the result of a redirect request rather than a connection error
+     * @return true if no listener or listener does not veto change
+     */
+    public boolean firePreFailover(boolean redirect)
+    {
+        boolean proceed = true;
+        if (_connectionListener != null)
+        {
+            proceed = _connectionListener.preFailover(redirect);
+        }
+        return proceed;
+    }
+
+    /**
+     * Fire the preResubscribe event to the registered connection listener (if any). If the listener
+     * vetoes resubscription then all the sessions are closed.
+     *
+     * @return true if no listener or listener does not veto resubscription.
+     * @throws JMSException
+     */
+    public boolean firePreResubscribe() throws JMSException
+    {
+        if (_connectionListener != null)
+        {
+            boolean resubscribe = _connectionListener.preResubscribe();
+            if (!resubscribe)
+            {
+                markAllSessionsClosed();
+            }
+            return resubscribe;
+        }
+        else
+        {
+            return true;
+        }
+    }
+
+    /**
+     * Fires a failover complete event to the registered connection listener (if any).
+     */
+    public void fireFailoverComplete()
+    {
+        if (_connectionListener != null)
+        {
+            _connectionListener.failoverComplete();
+        }
+    }
+
+    /**
+     * In order to protect the consistency of the connection and its child sessions, consumers and producers,
+     * the "failover mutex" must be held when doing any operations that could be corrupted during failover.
+     *
+     * @return a mutex. Guaranteed never to change for the lifetime of this connection even if failover occurs.
+     */
+    public final Object getFailoverMutex()
+    {
+        return _failoverMutex;
+    }
+
+    /**
+     * If failover is taking place this will block until it has completed. If failover
+     * is not taking place it will return immediately.
+     *
+     * @throws InterruptedException
+     */
+    public void blockUntilNotFailingOver() throws InterruptedException
+    {
+        _protocolHandler.blockUntilNotFailingOver();
+    }
+
+    /**
+     * Invoked by the AMQProtocolSession when a protocol session exception has occurred.
+     * This method sends the exception to a JMS exception listener, if configured, and
+     * propagates the exception to sessions, which in turn will propagate to consumers.
+     * This allows synchronous consumers to have exceptions thrown to them.
+     *
+     * @param cause the exception
+     */
+    public void exceptionReceived(Throwable cause)
+    {
+
+        _logger.debug("Connection Close done by:" + Thread.currentThread().getName());
+        _logger.debug("exceptionReceived is ", cause);
+
+        final JMSException je;
+        if (cause instanceof JMSException)
+        {
+            je = (JMSException) cause;
+        }
+        else
+        {
+            je = new JMSException("Exception thrown against " + toString() + ": " + cause);
+            if (cause instanceof Exception)
+            {
+                je.setLinkedException((Exception) cause);
+            }
+        }
+
+        // in the case of an IOException, MINA has closed the protocol session so we set _closed to true
+        // so that any generic client code that tries to close the connection will not mess up this error
+        // handling sequence
+        if (cause instanceof IOException)
+        {
+            _closed.set(true);
+        }
+
+        if (_exceptionListener != null)
+        {
+            _exceptionListener.onException(je);
+        }
+
+        if (!(cause instanceof AMQUndeliveredException) && !(cause instanceof AMQAuthenticationException))
+        {
+            try
+            {
+                _logger.info("Closing AMQConnection due to :" + cause.getMessage());
+                _closed.set(true);
+                closeAllSessions(cause); // FIXME: when doing this end up with RejectedExecutionException from executor.
+            }
+            catch (JMSException e)
+            {
+                _logger.error("Error closing all sessions: " + e, e);
+            }
+
+        }
+        else
+        {
+            _logger.info("Not a hard-error connection not closing.");
+        }
+    }
+
+    void registerSession(int channelId, AMQSession session)
+    {
+        _sessions.put(channelId, session);
+    }
+
+    void deregisterSession(int channelId)
+    {
+        _sessions.remove(channelId);
+    }
+
+    /**
+     * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling.
+     * The caller must hold the failover mutex before calling this method.
+     */
+    public void resubscribeSessions() throws AMQException
+    {
+        ArrayList sessions = new ArrayList(_sessions.values());
+        _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: remove?
+        for (Iterator it = sessions.iterator(); it.hasNext();)
+        {
+            AMQSession s = (AMQSession) it.next();
+            _protocolHandler.addSessionByChannel(s.getChannelId(), s);
+            reopenChannel(s.getChannelId(), s.getDefaultPrefetch(), s.getTransacted());
+            s.resubscribe();
+        }
+    }
+
+    public String toString()
+    {
+        StringBuffer buf = new StringBuffer("AMQConnection:\n");
+        if (_failoverPolicy.getCurrentBrokerDetails() == null)
+        {
+            buf.append("No active broker connection");
+        }
+        else
+        {
+            BrokerDetails bd = _failoverPolicy.getCurrentBrokerDetails();
+            buf.append("Host: ").append(String.valueOf(bd.getHost()));
+            buf.append("\nPort: ").append(String.valueOf(bd.getPort()));
+        }
+        buf.append("\nVirtual Host: ").append(String.valueOf(_virtualHost));
+        buf.append("\nClient ID: ").append(String.valueOf(_clientName));
+        buf.append("\nActive session count: ").append(_sessions == null ? 0 : _sessions.size());
+        return buf.toString();
+    }
+
+    public String toURL()
+    {
+        return _connectionURL.toString();
+    }
+
+    public Reference getReference() throws NamingException
+    {
+        return new Reference(
+                AMQConnection.class.getName(),
+                new StringRefAddr(AMQConnection.class.getName(), toURL()),
+                AMQConnectionFactory.class.getName(),
+                null);          // factory location
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnection.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnectionFactory.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnectionFactory.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnectionFactory.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,358 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.jms.ConnectionURL;
+
+import javax.jms.*;
+import javax.jms.Connection;
+import javax.naming.*;
+import javax.naming.spi.ObjectFactory;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Hashtable;
+
+
+public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, ObjectFactory, Referenceable
+{
+    private String _host;
+    private int _port;
+    private String _defaultUsername;
+    private String _defaultPassword;
+    private String _virtualPath;
+
+    private ConnectionURL _connectionDetails;
+
+
+    public AMQConnectionFactory()
+    {
+    }
+
+    public AMQConnectionFactory(String url) throws URLSyntaxException
+    {
+        _connectionDetails = new AMQConnectionURL(url);
+    }
+
+    public AMQConnectionFactory(ConnectionURL url)
+    {
+        _connectionDetails = url;
+    }
+
+    public AMQConnectionFactory(String broker, String username, String password,
+                                String clientName, String virtualHost) throws URLSyntaxException
+    {
+        this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" +
+                username + ":" + password + "@" + clientName +
+                virtualHost + "?brokerlist='" + broker + "'"));
+    }
+
+    public AMQConnectionFactory(String host, int port, String virtualPath)
+    {
+        this(host, port, "guest", "guest", virtualPath);
+    }
+
+    public AMQConnectionFactory(String host, int port, String defaultUsername, String defaultPassword,
+                                String virtualPath)
+    {
+        _host = host;
+        _port = port;
+        _defaultUsername = defaultUsername;
+        _defaultPassword = defaultPassword;
+        _virtualPath = virtualPath;
+
+//todo when setting Host/Port has been resolved then we can use this otherwise those methods won't work with the following line.
+//        _connectionDetails = new AMQConnectionURL(
+//                ConnectionURL.AMQ_PROTOCOL + "://" +
+//                        _defaultUsername + ":" + _defaultPassword + "@" +
+//                        virtualPath + "?brokerlist='tcp://" + host + ":" + port + "'");
+    }
+
+    /**
+     * @return The _defaultPassword.
+     */
+    public final String getDefaultPassword(String password)
+    {
+        if (_connectionDetails != null)
+        {
+            return _connectionDetails.getPassword();
+        }
+        else
+        {
+            return _defaultPassword;
+        }
+    }
+
+    /**
+     * @param password The _defaultPassword to set.
+     */
+    public final void setDefaultPassword(String password)
+    {
+        if (_connectionDetails != null)
+        {
+            _connectionDetails.setPassword(password);
+        }
+        _defaultPassword = password;
+    }
+
+    /**
+     * @return The _defaultPassword.
+     */
+    public final String getDefaultUsername(String password)
+    {
+        if (_connectionDetails != null)
+        {
+            return _connectionDetails.getUsername();
+        }
+        else
+        {
+            return _defaultUsername;
+        }
+    }
+
+    /**
+     * @param username The _defaultUsername to set.
+     */
+    public final void setDefaultUsername(String username)
+    {
+        if (_connectionDetails != null)
+        {
+            _connectionDetails.setUsername(username);
+        }
+        _defaultUsername = username;
+    }
+
+    /**
+     * @return The _host .
+     */
+    public final String getHost()
+    {
+        //todo this doesn't make sense in a multi broker URL as we have no current as that is done by AMQConnection
+        return _host;
+    }
+
+    /**
+     * @param host The _host to set.
+     */
+    public final void setHost(String host)
+    {
+        //todo if _connectionDetails is set then run _connectionDetails.addBrokerDetails()
+        // Should perhaps have this method changed to setBroker(host,port)
+        _host = host;
+    }
+
+    /**
+     * @return _port The _port to set.
+     */
+    public final int getPort()
+    {
+        //todo see getHost
+        return _port;
+    }
+
+    /**
+     * @param port The port to set.
+     */
+    public final void setPort(int port)
+    {
+        //todo see setHost
+        _port = port;
+    }
+
+    /**
+     * @return he _virtualPath.
+     */
+    public final String getVirtualPath()
+    {
+        if (_connectionDetails != null)
+        {
+            return _connectionDetails.getVirtualHost();
+        }
+        else
+        {
+            return _virtualPath;
+        }
+    }
+
+    /**
+     * @param path The _virtualPath to set.
+     */
+    public final void setVirtualPath(String path)
+    {
+        if (_connectionDetails != null)
+        {
+            _connectionDetails.setVirtualHost(path);
+        }
+
+        _virtualPath = path;
+    }
+
+    static String getUniqueClientID()
+    {
+        try
+        {
+            InetAddress addr = InetAddress.getLocalHost();
+            return addr.getHostName() + System.currentTimeMillis();
+        }
+        catch (UnknownHostException e)
+        {
+            return null;
+        }
+    }
+
+    public Connection createConnection() throws JMSException
+    {
+        try
+        {
+            if (_connectionDetails != null)
+            {
+                if (_connectionDetails.getClientName() == null || _connectionDetails.getClientName().equals(""))
+                {
+                    _connectionDetails.setClientName(getUniqueClientID());
+                }
+                return new AMQConnection(_connectionDetails);
+            }
+            else
+            {
+                return new AMQConnection(_host, _port, _defaultUsername, _defaultPassword, getUniqueClientID(),
+                        _virtualPath);
+            }
+        }
+        catch (Exception e)
+        {
+            JMSException jmse = new JMSException("Error creating connection: " + e.getMessage());
+            jmse.setLinkedException(e);
+            throw jmse;
+        }
+
+
+    }
+
+    public Connection createConnection(String userName, String password) throws JMSException
+    {
+        try
+        {
+            return new AMQConnection(_host, _port, userName, password, getUniqueClientID(), _virtualPath);
+        }
+        catch (Exception e)
+        {
+            JMSException jmse = new JMSException("Error creating connection: " + e.getMessage());
+            jmse.setLinkedException(e);
+            throw jmse;
+        }
+    }
+
+    public QueueConnection createQueueConnection() throws JMSException
+    {
+        return (QueueConnection) createConnection();
+    }
+
+    public QueueConnection createQueueConnection(String username, String password) throws JMSException
+    {
+        return (QueueConnection) createConnection(username, password);
+    }
+
+    public TopicConnection createTopicConnection() throws JMSException
+    {
+        return (TopicConnection) createConnection();
+    }
+
+    public TopicConnection createTopicConnection(String username, String password) throws JMSException
+    {
+        return (TopicConnection) createConnection(username, password);
+    }
+
+
+    public ConnectionURL getConnectionURL()
+    {
+        return _connectionDetails;
+    }
+
+    /**
+     * JNDI interface to create objects from References.
+     *
+     * @param obj  The Reference from JNDI
+     * @param name
+     * @param ctx
+     * @param env
+     * @return AMQConnection,AMQTopic,AMQQueue, or AMQConnectionFactory.
+     * @throws Exception
+     */
+    public Object getObjectInstance(Object obj, Name name, Context ctx,
+                                    Hashtable env) throws Exception
+    {
+        if (obj instanceof Reference)
+        {
+            Reference ref = (Reference) obj;
+
+            if (ref.getClassName().equals(AMQConnection.class.getName()))
+            {
+                RefAddr addr = ref.get(AMQConnection.class.getName());
+
+                if (addr != null)
+                {
+                    return new AMQConnection((String) addr.getContent());
+                }
+            }
+
+            if (ref.getClassName().equals(AMQQueue.class.getName()))
+            {
+                RefAddr addr = ref.get(AMQQueue.class.getName());
+
+                if (addr != null)
+                {
+                    return new AMQQueue(new AMQBindingURL((String) addr.getContent()).getQueueName());
+                }
+            }
+
+            if (ref.getClassName().equals(AMQTopic.class.getName()))
+            {
+                RefAddr addr = ref.get(AMQTopic.class.getName());
+
+                if (addr != null)
+                {
+                    return new AMQTopic(new AMQBindingURL((String) addr.getContent()).getDestinationName());
+                }
+            }
+
+            if (ref.getClassName().equals(AMQConnectionFactory.class.getName()))
+            {
+                RefAddr addr = ref.get(AMQConnectionFactory.class.getName());
+
+                if (addr != null)
+                {
+                    return new AMQConnectionFactory(new AMQConnectionURL((String) addr.getContent()));
+                }
+            }
+
+        }
+        return null;
+    }
+
+
+    public Reference getReference() throws NamingException
+    {
+        return new Reference(
+                AMQConnectionFactory.class.getName(),
+                new StringRefAddr(AMQConnectionFactory.class.getName(), _connectionDetails.getURL()),
+                AMQConnectionFactory.class.getName(),
+                null);          // factory location
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnectionFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnectionURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnectionURL.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnectionURL.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnectionURL.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,399 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.url.URLHelper;
+import org.apache.qpid.url.URLSyntaxException;
+
+import java.util.*;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+public class AMQConnectionURL implements ConnectionURL
+{
+    private String _url;
+    private String _failoverMethod;
+    private HashMap<String, String> _failoverOptions;
+    private HashMap<String, String> _options;
+    private List<BrokerDetails> _brokers;
+    private String _clientName;
+    private String _username;
+    private String _password;
+    private String _virtualHost;
+
+    public AMQConnectionURL(String fullURL) throws URLSyntaxException
+    {
+        _url = fullURL;
+        _options = new HashMap<String, String>();
+        _brokers = new LinkedList<BrokerDetails>();
+        _failoverOptions = new HashMap<String, String>();
+
+        // Connection URL format
+        //amqp://[user:pass@][clientid]/virtualhost?brokerlist='tcp://host:port?option=\'value\',option=\'value\';vm://:3/virtualpath?option=\'value\'',failover='method?option=\'value\',option='value''"
+        // Options are of course optional except for requiring a single broker in the broker list.
+        try
+        {
+            URI connection = new URI(fullURL);
+
+            if (connection.getScheme() == null || !(connection.getScheme().equalsIgnoreCase(AMQ_PROTOCOL)))
+            {
+                throw new URISyntaxException(fullURL, "Not an AMQP URL");
+            }
+
+            if (connection.getHost() == null || connection.getHost().equals(""))
+            {
+                String uid = AMQConnectionFactory.getUniqueClientID();
+                if (uid == null)
+                {
+                    URLHelper.parseError(-1, "Client Name not specified", fullURL);
+                }
+                else
+                {
+                    setClientName(uid);
+                }
+
+            }
+            else
+            {
+                setClientName(connection.getHost());
+            }
+
+            String userInfo = connection.getUserInfo();
+
+            if (userInfo == null)
+            {
+                //Fix for Java 1.5 which doesn't parse UserInfo for non http URIs
+                userInfo = connection.getAuthority();
+
+                if (userInfo != null)
+                {
+                    int atIndex = userInfo.indexOf('@');
+
+                    if (atIndex != -1)
+                    {
+                        userInfo = userInfo.substring(0, atIndex);
+                    }
+                    else
+                    {
+                        userInfo = null;
+                    }
+                }
+
+            }
+
+            if (userInfo == null)
+            {
+                URLHelper.parseError(AMQ_PROTOCOL.length() + 3,
+                        "User information not found on url", fullURL);
+            }
+            else
+            {
+                parseUserInfo(userInfo);
+            }
+            String virtualHost = connection.getPath();
+
+            if (virtualHost != null && (!virtualHost.equals("")))
+            {
+                setVirtualHost(virtualHost);
+            }
+            else
+            {
+                int authLength = connection.getAuthority().length();
+                int start = AMQ_PROTOCOL.length() + 3;
+                int testIndex = start + authLength;
+                if (testIndex < fullURL.length() && fullURL.charAt(testIndex) == '?')
+                {
+                    URLHelper.parseError(start, testIndex - start, "Virtual host found", fullURL);
+                }
+                else
+                {
+                    URLHelper.parseError(-1, "Virtual host not specified", fullURL);
+                }
+
+            }
+
+
+            URLHelper.parseOptions(_options, connection.getQuery());
+
+            processOptions();
+
+            //Fragment is #string (not used)
+            //System.out.println(connection.getFragment());
+
+        }
+        catch (URISyntaxException uris)
+        {
+            if (uris instanceof URLSyntaxException)
+            {
+                throw (URLSyntaxException) uris;
+            }
+
+            int slash = fullURL.indexOf("\\");
+
+            if (slash == -1)
+            {
+                URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
+            }
+            else
+            {
+                if (slash != 0 && fullURL.charAt(slash - 1) == ':')
+                {
+                    URLHelper.parseError(slash - 2, fullURL.indexOf('?') - slash + 2, "Virtual host looks like a windows path, forward slash not allowed in URL", fullURL);
+                }
+                else
+                {
+                    URLHelper.parseError(slash, "Forward slash not allowed in URL", fullURL);
+                }
+            }
+
+        }
+    }
+
+    private void parseUserInfo(String userinfo) throws URLSyntaxException
+    {
+        //user info = user:pass
+
+        int colonIndex = userinfo.indexOf(':');
+
+        if (colonIndex == -1)
+        {
+            URLHelper.parseError(AMQ_PROTOCOL.length() + 3, userinfo.length(),
+                    "Null password in user information not allowed.", _url);
+        }
+        else
+        {
+            setUsername(userinfo.substring(0, colonIndex));
+            setPassword(userinfo.substring(colonIndex + 1));
+        }
+
+    }
+
+    private void processOptions() throws URLSyntaxException
+    {
+        if (_options.containsKey(OPTIONS_BROKERLIST))
+        {
+            String brokerlist = _options.get(OPTIONS_BROKERLIST);
+
+            //brokerlist tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'
+            StringTokenizer st = new StringTokenizer(brokerlist, "" + URLHelper.BROKER_SEPARATOR);
+
+            while (st.hasMoreTokens())
+            {
+                String broker = st.nextToken();
+
+                _brokers.add(new AMQBrokerDetails(broker));
+            }
+
+            _options.remove(OPTIONS_BROKERLIST);
+        }
+
+        if (_options.containsKey(OPTIONS_FAILOVER))
+        {
+            String failover = _options.get(OPTIONS_FAILOVER);
+
+            // failover='method?option='value',option='value''
+
+            int methodIndex = failover.indexOf('?');
+
+            if (methodIndex > -1)
+            {
+                _failoverMethod = failover.substring(0, methodIndex);
+                URLHelper.parseOptions(_failoverOptions, failover.substring(methodIndex + 1));
+            }
+            else
+            {
+                _failoverMethod = failover;
+            }
+
+            _options.remove(OPTIONS_FAILOVER);
+        }
+    }
+
+    public String getURL()
+    {
+        return _url;
+    }
+
+    public String getFailoverMethod()
+    {
+        return _failoverMethod;
+    }
+
+    public String getFailoverOption(String key)
+    {
+        return _failoverOptions.get(key);
+    }
+
+    public void setFailoverOption(String key, String value)
+    {
+        _failoverOptions.put(key, value);
+    }
+
+    public int getBrokerCount()
+    {
+        return _brokers.size();
+    }
+
+    public BrokerDetails getBrokerDetails(int index)
+    {
+        if (index < _brokers.size())
+        {
+            return _brokers.get(index);
+        }
+        else
+        {
+            return null;
+        }
+    }
+
+    public void addBrokerDetails(BrokerDetails broker)
+    {
+        if (!(_brokers.contains(broker)))
+        {
+            _brokers.add(broker);
+        }
+    }
+
+    public List<BrokerDetails> getAllBrokerDetails()
+    {
+        return _brokers;
+    }
+
+    public String getClientName()
+    {
+        return _clientName;
+    }
+
+    public void setClientName(String clientName)
+    {
+        _clientName = clientName;
+    }
+
+    public String getUsername()
+    {
+        return _username;
+    }
+
+    public void setUsername(String username)
+    {
+        _username = username;
+    }
+
+    public String getPassword()
+    {
+        return _password;
+    }
+
+    public void setPassword(String password)
+    {
+        _password = password;
+    }
+
+    public String getVirtualHost()
+    {
+        return _virtualHost;
+    }
+
+    public void setVirtualHost(String virtuaHost)
+    {
+        _virtualHost = virtuaHost;
+    }
+
+    public String getOption(String key)
+    {
+        return _options.get(key);
+    }
+
+    public void setOption(String key, String value)
+    {
+        _options.put(key, value);
+    }
+
+    public String toString()
+    {
+        StringBuffer sb = new StringBuffer();
+
+        sb.append(AMQ_PROTOCOL);
+        sb.append("://");
+
+        if (_username != null)
+        {
+            sb.append(_username);
+
+            if (_password != null)
+            {
+                sb.append(':');
+                sb.append(_password);
+            }
+
+            sb.append('@');
+        }
+
+        sb.append(_clientName);
+
+        sb.append(_virtualHost);
+
+        sb.append(optionsToString());
+
+        return sb.toString();
+    }
+
+    private String optionsToString()
+    {
+        StringBuffer sb = new StringBuffer();
+
+        sb.append("?" + OPTIONS_BROKERLIST + "='");
+
+        for (BrokerDetails service : _brokers)
+        {
+            sb.append(service.toString());
+            sb.append(';');
+        }
+
+        sb.deleteCharAt(sb.length() - 1);
+        sb.append("'");
+
+        if (_failoverMethod != null)
+        {
+            sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+            sb.append(OPTIONS_FAILOVER + "='");
+            sb.append(_failoverMethod);
+            sb.append(URLHelper.printOptions(_failoverOptions));
+            sb.append("'");
+        }
+
+        return sb.toString();
+    }
+
+
+    public static void main(String[] args) throws URLSyntaxException
+    {
+
+        String url2 = "amqp://ritchiem:bob@temp?brokerlist='tcp://localhost:5672;jcp://fancyserver:3000/',failover='roundrobin'";
+        //"amqp://user:pass@clientid/virtualhost?brokerlist='tcp://host:1?option1=\'value\',option2=\'value\';vm://:3?option1=\'value\'',failover='method?option1=\'value\',option2='value''";
+
+        ConnectionURL connectionurl2 = new AMQConnectionURL(url2);
+
+        System.out.println(url2);
+        System.out.println(connectionurl2);
+
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnectionURL.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQDestination.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQDestination.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQDestination.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,282 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.url.BindingURL;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.url.URLHelper;
+import org.apache.qpid.exchange.ExchangeDefaults;
+
+import javax.naming.Reference;
+import javax.naming.NamingException;
+import javax.naming.StringRefAddr;
+import javax.naming.Referenceable;
+import javax.jms.Destination;
+
+
+public abstract class AMQDestination implements Destination, Referenceable
+{
+    protected final String _exchangeName;
+
+    protected final String _exchangeClass;
+
+    protected final String _destinationName;
+
+    protected boolean _isDurable;
+
+    protected final boolean _isExclusive;
+
+    protected final boolean _isAutoDelete;
+
+    protected String _queueName;
+
+    protected AMQDestination(String url) throws URLSyntaxException
+    {
+        this(new AMQBindingURL(url));
+    }
+
+    protected AMQDestination(BindingURL binding)
+    {
+        _exchangeName = binding.getExchangeName();
+        _exchangeClass = binding.getExchangeClass();
+        _destinationName = binding.getDestinationName();
+
+        _isExclusive = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCLUSIVE));
+        _isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE));
+        _isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE));
+        _queueName = binding.getQueueName();
+    }
+
+    protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, String queueName)
+    {
+        this(exchangeName, exchangeClass, destinationName, false, false, queueName);
+    }
+
+    protected AMQDestination(String exchangeName, String exchangeClass, String destinationName)
+    {
+        this(exchangeName, exchangeClass, destinationName, false, false, null);
+    }
+
+    protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, boolean isExclusive,
+                             boolean isAutoDelete, String queueName)
+    {
+        if (destinationName == null)
+        {
+            throw new IllegalArgumentException("Destination name must not be null");
+        }
+        if (exchangeName == null)
+        {
+            throw new IllegalArgumentException("Exchange name must not be null");
+        }
+        if (exchangeClass == null)
+        {
+            throw new IllegalArgumentException("Exchange class must not be null");
+        }
+        _exchangeName = exchangeName;
+        _exchangeClass = exchangeClass;
+        _destinationName = destinationName;
+        _isExclusive = isExclusive;
+        _isAutoDelete = isAutoDelete;
+        _queueName = queueName;
+    }
+
+    public abstract String getEncodedName();
+
+    public boolean isDurable()
+    {
+        return _isDurable;
+    }
+
+    public String getExchangeName()
+    {
+        return _exchangeName;
+    }
+
+    public String getExchangeClass()
+    {
+        return _exchangeClass;
+    }
+
+    public boolean isTopic()
+    {
+        return ExchangeDefaults.TOPIC_EXCHANGE_NAME.equals(_exchangeName);
+    }
+
+    public boolean isQueue()
+    {
+        return ExchangeDefaults.DIRECT_EXCHANGE_NAME.equals(_exchangeName);
+    }
+
+    public String getDestinationName()
+    {
+        return _destinationName;
+    }
+
+    public String getQueueName()
+    {
+        return _queueName;
+    }
+
+    public void setQueueName(String queueName)
+    {
+        _queueName = queueName;
+    }
+
+    public abstract String getRoutingKey();
+
+    public boolean isExclusive()
+    {
+        return _isExclusive;
+    }
+
+    public boolean isAutoDelete()
+    {
+        return _isAutoDelete;
+    }
+
+    public abstract boolean isNameRequired();
+
+    public String toString()
+    {
+        return toURL();
+
+        /*
+        return "Destination: " + _destinationName + ", " +
+               "Queue Name: " + _queueName + ", Exchange: " + _exchangeName +
+               ", Exchange class: " + _exchangeClass + ", Exclusive: " + _isExclusive +
+               ", AutoDelete: " + _isAutoDelete + ", Routing  Key: " + getRoutingKey();
+         */
+    }
+
+    public String toURL()
+    {
+        StringBuffer sb = new StringBuffer();
+
+        sb.append(_exchangeClass);
+        sb.append("://");
+        sb.append(_exchangeName);
+
+        sb.append("/");
+
+        if (_destinationName != null)
+        {
+            sb.append(_destinationName);
+        }
+
+        sb.append("/");
+
+        if (_queueName != null)
+        {
+            sb.append(_queueName);
+        }
+
+        sb.append("?");
+
+        if (_isDurable)
+        {
+            sb.append(BindingURL.OPTION_DURABLE);
+            sb.append("='true'");
+            sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+        }
+
+        if (_isExclusive)
+        {
+            sb.append(BindingURL.OPTION_EXCLUSIVE);
+            sb.append("='true'");
+            sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+        }
+
+        if (_isAutoDelete)
+        {
+            sb.append(BindingURL.OPTION_AUTODELETE);
+            sb.append("='true'");
+            sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+        }
+
+        //remove the last char '?' if there is no options , ',' if there are.
+        sb.deleteCharAt(sb.length() - 1);
+
+        return sb.toString();
+    }
+
+    public boolean equals(Object o)
+    {
+        if (this == o)
+        {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass())
+        {
+            return false;
+        }
+
+        final AMQDestination that = (AMQDestination) o;
+
+        if (!_destinationName.equals(that._destinationName))
+        {
+            return false;
+        }
+        if (!_exchangeClass.equals(that._exchangeClass))
+        {
+            return false;
+        }
+        if (!_exchangeName.equals(that._exchangeName))
+        {
+            return false;
+        }
+        if ((_queueName == null && that._queueName != null) ||
+                (_queueName != null && !_queueName.equals(that._queueName)))
+        {
+            return false;
+        }
+        if (_isExclusive != that._isExclusive)
+        {
+            return false;
+        }
+        if (_isAutoDelete != that._isAutoDelete)
+        {
+            return false;
+        }
+        return true;
+    }
+
+    public int hashCode()
+    {
+        int result;
+        result = _exchangeName.hashCode();
+        result = 29 * result + _exchangeClass.hashCode();
+        result = 29 * result + _destinationName.hashCode();
+        if (_queueName != null)
+        {
+            result = 29 * result + _queueName.hashCode();
+        }
+        result = result * (_isExclusive ? 13 : 7);
+        result = result * (_isAutoDelete ? 13 : 7);
+        return result;
+    }
+
+    public Reference getReference() throws NamingException
+    {
+        return new Reference(
+                this.getClass().getName(),
+                new StringRefAddr(this.getClass().getName(), toURL()),
+                AMQConnectionFactory.class.getName(),
+                null);          // factory location
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQDestination.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQHeadersExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQHeadersExchange.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQHeadersExchange.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQHeadersExchange.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,49 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.exchange.ExchangeDefaults;
+
+/**
+ * A destination backed by a headers exchange
+ */
+public class AMQHeadersExchange extends AMQDestination
+{
+    public AMQHeadersExchange(String queueName)
+    {
+        super(queueName, ExchangeDefaults.HEADERS_EXCHANGE_CLASS, queueName, true, true, null);
+    }
+
+    public String getEncodedName()
+    {
+        return getDestinationName();
+    }
+
+    public String getRoutingKey()
+    {
+        return getDestinationName();
+    }
+
+    public boolean isNameRequired()
+    {
+        //Not sure what the best approach is here, probably to treat this like a topic
+        //and allow server to generate names. As it is AMQ specific it doesn't need to
+        //fit the JMS API expectations so this is not as yet critical.
+        return false;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQHeadersExchange.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQNoConsumersException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQNoConsumersException.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQNoConsumersException.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQNoConsumersException.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,34 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.AMQUndeliveredException;
+import org.apache.qpid.protocol.AMQConstant;
+
+
+public class AMQNoConsumersException extends AMQUndeliveredException
+{
+    public AMQNoConsumersException(String msg, Object bounced)
+    {
+        super(AMQConstant.NO_CONSUMERS.getCode(), msg, bounced);
+    }
+
+
+}
+
+

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQNoConsumersException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQNoRouteException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQNoRouteException.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQNoRouteException.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQNoRouteException.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,34 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.AMQUndeliveredException;
+import org.apache.qpid.protocol.AMQConstant;
+
+
+public class AMQNoRouteException extends AMQUndeliveredException
+{
+    public AMQNoRouteException(String msg, Object bounced)
+    {
+        super(AMQConstant.NO_ROUTE.getCode(), msg, bounced);
+    }
+
+
+}
+
+

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQNoRouteException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQQueue.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQQueue.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQQueue.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,91 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.url.BindingURL;
+import org.apache.qpid.exchange.ExchangeDefaults;
+
+import javax.jms.Queue;
+
+public class AMQQueue extends AMQDestination implements Queue
+{
+
+    /**
+     * Create a reference to a non temporary queue using a BindingURL object.
+     * Note this does not actually imply the queue exists.
+     * @param binding a BindingURL object
+     */
+    public AMQQueue(BindingURL binding)
+    {
+         super(binding);
+    }
+
+    /**
+     * Create a reference to a non temporary queue. Note this does not actually imply the queue exists.
+     * @param name the name of the queue
+     */
+    public AMQQueue(String name)
+    {
+        this(name, false);
+    }
+
+    /**
+     * Create a queue with a specified name.
+     *
+     * @param name the destination name (used in the routing key)
+     * @param temporary if true the broker will generate a queue name, also if true then the queue is autodeleted
+     * and exclusive
+     */
+    public AMQQueue(String name, boolean temporary)
+    {
+        // queue name is set to null indicating that the broker assigns a name in the case of temporary queues
+        // temporary queues are typically used as response queues
+        this(name, temporary?null:name, temporary, temporary);
+        _isDurable = !temporary;
+    }
+
+    /**
+     * Create a reference to a queue. Note this does not actually imply the queue exists.
+     * @param destinationName the queue name
+     * @param queueName the queue name
+     * @param exclusive true if the queue should only permit a single consumer
+     * @param autoDelete true if the queue should be deleted automatically when the last consumers detaches
+     */
+    public AMQQueue(String destinationName, String queueName, boolean exclusive, boolean autoDelete)
+    {
+        super(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, destinationName, exclusive,
+              autoDelete, queueName);
+    }
+
+    public String getEncodedName()
+    {
+        return 'Q' + getQueueName();
+    }
+
+    public String getRoutingKey()
+    {
+        return getQueueName();
+    }
+
+    public boolean isNameRequired()
+    {
+        //If the name is null, we require one to be generated by the client so that it will#
+        //remain valid if we failover (see BLZ-24)
+        return getQueueName() == null;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native