You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2006/09/20 00:07:25 UTC
svn commit: r447994 [18/46] - in /incubator/qpid/trunk/qpid: ./ cpp/
cpp/bin/ cpp/broker/ cpp/broker/inc/ cpp/broker/src/ cpp/broker/test/
cpp/client/ cpp/client/inc/ cpp/client/src/ cpp/client/test/ cpp/common/
cpp/common/concurrent/ cpp/common/concur...
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQBrokerDetails.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQBrokerDetails.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQBrokerDetails.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQBrokerDetails.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,301 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.url.URLHelper;
+import org.apache.qpid.url.URLSyntaxException;
+
+import java.util.HashMap;
+import java.net.URISyntaxException;
+import java.net.URI;
+
+public class AMQBrokerDetails implements BrokerDetails
+{
+ private String _host;
+ private int _port;
+ private String _transport;
+
+ private HashMap<String, String> _options;
+
+ public AMQBrokerDetails()
+ {
+ _options = new HashMap<String, String>();
+ }
+
+ public AMQBrokerDetails(String url) throws URLSyntaxException
+ {
+ this();
+ // URL should be of format tcp://host:port?option='value',option='value'
+ try
+ {
+ URI connection = new URI(url);
+
+ String transport = connection.getScheme();
+
+ // Handles some defaults to minimise changes to existing broker URLS e.g. localhost
+ if (transport != null)
+ {
+ //todo this list of valid transports should be enumerated somewhere
+ if ((!(transport.equalsIgnoreCase("vm") ||
+ transport.equalsIgnoreCase("tcp"))))
+ {
+ if (transport.equalsIgnoreCase("localhost"))
+ {
+ connection = new URI(DEFAULT_TRANSPORT + "://" + url);
+ transport = connection.getScheme();
+ }
+ else
+ {
+ if (url.charAt(transport.length()) == ':' && url.charAt(transport.length()+1) != '/' )
+ {
+ //Then most likely we have a host:port value
+ connection = new URI(DEFAULT_TRANSPORT + "://" + url);
+ transport = connection.getScheme();
+ }
+ else
+ {
+ URLHelper.parseError(0, transport.length(), "Unknown transport", url);
+ }
+ }
+ }
+ }
+ else
+ {
+ //Default the transport
+ connection = new URI(DEFAULT_TRANSPORT + "://" + url);
+ transport = connection.getScheme();
+ }
+
+ if (transport == null)
+ {
+ URLHelper.parseError(-1, "Unknown transport:'" + transport + "'" +
+ " In broker URL:'" + url + "' Format: " + URL_FORMAT_EXAMPLE, "");
+ }
+
+ setTransport(transport);
+
+ String host = connection.getHost();
+
+ // Fix for Java 1.5
+ if (host == null)
+ {
+ host = "";
+ }
+
+ setHost(host);
+
+ int port = connection.getPort();
+
+ if (port == -1)
+ {
+ // Another fix for Java 1.5 URI handling
+ String auth = connection.getAuthority();
+
+ if (auth != null && auth.startsWith(":"))
+ {
+ setPort(Integer.parseInt(auth.substring(1)));
+ }
+ else
+ {
+ setPort(DEFAULT_PORT);
+ }
+ }
+ else
+ {
+ setPort(port);
+ }
+
+ String queryString = connection.getQuery();
+
+ URLHelper.parseOptions(_options, queryString);
+
+ //Fragment is #string (not used)
+ }
+ catch (URISyntaxException uris)
+ {
+ if (uris instanceof URLSyntaxException)
+ {
+ throw (URLSyntaxException) uris;
+ }
+
+ URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
+ }
+ }
+
+ public AMQBrokerDetails(String host, int port, boolean useSSL)
+ {
+ _host = host;
+ _port = port;
+
+ if (useSSL)
+ {
+ setOption(OPTIONS_SSL, "true");
+ }
+ }
+
+ public String getHost()
+ {
+ return _host;
+ }
+
+ public void setHost(String _host)
+ {
+ this._host = _host;
+ }
+
+ public int getPort()
+ {
+ return _port;
+ }
+
+ public void setPort(int _port)
+ {
+ this._port = _port;
+ }
+
+ public String getTransport()
+ {
+ return _transport;
+ }
+
+ public void setTransport(String _transport)
+ {
+ this._transport = _transport;
+ }
+
+
+ public String getOption(String key)
+ {
+ return _options.get(key);
+ }
+
+ public void setOption(String key, String value)
+ {
+ _options.put(key, value);
+ }
+
+ public long getTimeout()
+ {
+ if (_options.containsKey(OPTIONS_CONNECT_TIMEOUT))
+ {
+ try
+ {
+ return Long.parseLong(_options.get(OPTIONS_CONNECT_TIMEOUT));
+ }
+ catch (NumberFormatException nfe)
+ {
+ //Do nothing as we will use the default below.
+ }
+ }
+
+ return BrokerDetails.DEFAULT_CONNECT_TIMEOUT;
+ }
+
+ public void setTimeout(long timeout)
+ {
+ setOption(OPTIONS_CONNECT_TIMEOUT, Long.toString(timeout));
+ }
+
+ public String toString()
+ {
+ StringBuffer sb = new StringBuffer();
+
+ sb.append(_transport);
+ sb.append("://");
+
+ if (!(_transport.equalsIgnoreCase("vm")))
+ {
+ sb.append(_host);
+ }
+
+ sb.append(':');
+ sb.append(_port);
+
+ sb.append(printOptionsURL());
+
+ return sb.toString();
+ }
+
+ public boolean equals(Object o)
+ {
+ if (!(o instanceof BrokerDetails))
+ {
+ return false;
+ }
+
+ BrokerDetails bd = (BrokerDetails) o;
+
+ return _host.equalsIgnoreCase(bd.getHost()) &&
+ (_port == bd.getPort()) &&
+ _transport.equalsIgnoreCase(bd.getTransport()) &&
+ (useSSL() == bd.useSSL());
+
+ //todo do we need to compare all the options as well?
+ }
+
+ private String printOptionsURL()
+ {
+ StringBuffer optionsURL = new StringBuffer();
+
+ optionsURL.append('?');
+
+ if (!(_options.isEmpty()))
+ {
+
+ for (String key : _options.keySet())
+ {
+ optionsURL.append(key);
+
+ optionsURL.append("='");
+
+ optionsURL.append(_options.get(key));
+
+ optionsURL.append("'");
+
+ optionsURL.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ }
+ }
+
+ //remove the extra DEFAULT_OPTION_SEPERATOR or the '?' if there are no options
+ optionsURL.deleteCharAt(optionsURL.length() - 1);
+
+ return optionsURL.toString();
+ }
+
+ public boolean useSSL()
+ {
+ // To be friendly to users we should be case insensitive.
+ // or simply force users to conform to OPTIONS_SSL
+ // todo make case insensitive by trying ssl Ssl sSl ssL SSl SsL sSL SSL
+
+ if (_options.containsKey(OPTIONS_SSL))
+ {
+ return _options.get(OPTIONS_SSL).equalsIgnoreCase("true");
+ }
+
+ return false;
+ }
+
+ public void useSSL(boolean ssl)
+ {
+ setOption(OPTIONS_SSL, Boolean.toString(ssl));
+ }
+
+
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQBrokerDetails.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnection.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnection.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnection.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,927 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQUndeliveredException;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQUnresolvedAddressException;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.failover.FailoverSupport;
+import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.jms.*;
+import org.apache.qpid.jms.Connection;
+
+import org.apache.log4j.Logger;
+
+import javax.jms.*;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.naming.Reference;
+import javax.naming.NamingException;
+import javax.naming.StringRefAddr;
+import javax.naming.Referenceable;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.net.ConnectException;
+import java.nio.channels.UnresolvedAddressException;
+import java.text.MessageFormat;
+
+public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
+{
+ private static final Logger _logger = Logger.getLogger(AMQConnection.class);
+
+ private AtomicInteger _idFactory = new AtomicInteger(0);
+
+ /**
+ * This is the "root" mutex that must be held when doing anything that could be impacted by failover.
+ * This must be held by any child objects of this connection such as the session, producers and consumers.
+ */
+ private final Object _failoverMutex = new Object();
+
+ /**
+ * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels
+ * per session and we must prevent the client from opening too many. Zero means unlimited.
+ */
+ private long _maximumChannelCount;
+
+ /**
+ * The maximum size of frame supported by the server
+ */
+ private long _maximumFrameSize;
+
+ /**
+ * The protocol handler dispatches protocol events for this connection. For example, when the connection is dropped
+ * the handler deals with this. It also deals with the initial dispatch of any protocol frames to their appropriate
+ * handler.
+ */
+ private AMQProtocolHandler _protocolHandler;
+
+ /**
+ * Maps from session id (Integer) to AMQSession instance
+ */
+ private final Map _sessions = new LinkedHashMap(); //fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap
+
+ private String _clientName;
+
+ /**
+ * The user name to use for authentication
+ */
+ private String _username;
+
+ /**
+ * The password to use for authentication
+ */
+ private String _password;
+
+ /**
+ * The virtual path to connect to on the AMQ server
+ */
+ private String _virtualHost;
+
+ private ExceptionListener _exceptionListener;
+
+ private ConnectionListener _connectionListener;
+
+ private ConnectionURL _connectionURL;
+
+ /**
+ * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for
+ * message publication.
+ */
+ private boolean _started;
+
+ /**
+ * Policy dictating how to failover
+ */
+ private FailoverPolicy _failoverPolicy;
+
+ /*
+ * _Connected should be refactored with a suitable wait object.
+ */
+ private boolean _connected;
+
+ /*
+ * The last error code that occured on the connection. Used to return the correct exception to the client
+ */
+ private AMQException _lastAMQException = null;
+
+ public AMQConnection(String broker, String username, String password,
+ String clientName, String virtualHost) throws AMQException, URLSyntaxException
+ {
+ this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" +
+ username + ":" + password + "@" + clientName +
+ virtualHost + "?brokerlist='" + broker + "'"));
+ }
+
+ public AMQConnection(String host, int port, String username, String password,
+ String clientName, String virtualHost) throws AMQException, URLSyntaxException
+ {
+ this(host, port, false, username, password, clientName, virtualHost);
+ }
+
+ public AMQConnection(String host, int port, boolean useSSL, String username, String password,
+ String clientName, String virtualHost) throws AMQException, URLSyntaxException
+ {
+ this(new AMQConnectionURL(useSSL ?
+ ConnectionURL.AMQ_PROTOCOL + "://" +
+ username + ":" + password + "@" + clientName +
+ virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
+ + "," + ConnectionURL.OPTIONS_SSL + "='true'" :
+ ConnectionURL.AMQ_PROTOCOL + "://" +
+ username + ":" + password + "@" + clientName +
+ virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
+ + "," + ConnectionURL.OPTIONS_SSL + "='false'"
+ ));
+ }
+
+ public AMQConnection(String connection) throws AMQException, URLSyntaxException
+ {
+ this(new AMQConnectionURL(connection));
+ }
+
+ public AMQConnection(ConnectionURL connectionURL) throws AMQException
+ {
+ _logger.info("Connection:" + connectionURL);
+
+ if (connectionURL == null)
+ {
+ throw new IllegalArgumentException("Connection must be specified");
+ }
+
+ _connectionURL = connectionURL;
+
+ _clientName = connectionURL.getClientName();
+ _username = connectionURL.getUsername();
+ _password = connectionURL.getPassword();
+ _virtualHost = connectionURL.getVirtualHost();
+
+ _failoverPolicy = new FailoverPolicy(connectionURL);
+
+ _protocolHandler = new AMQProtocolHandler(this);
+
+ // We are not currently connected
+ _connected = false;
+
+
+ Exception lastException = new Exception();
+ lastException.initCause(new ConnectException());
+
+ while (lastException != null && lastException.getCause() instanceof ConnectException && _failoverPolicy.failoverAllowed())
+ {
+ try
+ {
+ makeBrokerConnection(_failoverPolicy.getNextBrokerDetails());
+ lastException = null;
+ }
+ catch (Exception e)
+ {
+ lastException = e;
+
+ _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e.getCause());
+ _logger.info(e);
+ _logger.info(e.getCause());
+ }
+ }
+
+ _logger.debug("Are we connected:" + _connected);
+
+ // Then the Failover Thread will handle conneciton
+ if (_failoverPolicy.failoverAllowed())
+ {
+ //TODO this needs to be redone so that we are not spinning.
+ // A suitable object should be set that is then waited on
+ // and only notified when a connection is made or when
+ // the AMQConnection gets closed.
+ while (!_connected && !_closed.get())
+ {
+ try
+ {
+ _logger.debug("Sleeping.");
+ Thread.sleep(100);
+ }
+ catch (InterruptedException ie)
+ {
+ _logger.debug("Woken up.");
+ }
+ }
+ if (!_failoverPolicy.failoverAllowed() || _failoverPolicy.getCurrentBrokerDetails() == null)
+ {
+ if (_lastAMQException != null)
+ {
+ throw _lastAMQException;
+ }
+ }
+ }
+ else
+ {
+ String message = null;
+
+ if (lastException != null)
+ {
+ if (lastException.getCause() != null)
+ {
+ message = lastException.getCause().getMessage();
+ }
+ else
+ {
+ message = lastException.getMessage();
+ }
+ }
+
+ if (message == null || message.equals(""))
+ {
+ message = "Unable to Connect";
+ }
+
+ AMQException e = new AMQConnectionException(message);
+
+ if (lastException != null)
+ {
+ if (lastException instanceof UnresolvedAddressException)
+ {
+ e = new AMQUnresolvedAddressException(message);
+ }
+ e.initCause(lastException);
+ }
+
+ throw e;
+ }
+ }
+
+ protected AMQConnection(String username, String password, String clientName, String virtualHost)
+ {
+ _clientName = clientName;
+ _username = username;
+ _password = password;
+ _virtualHost = virtualHost;
+ }
+
+ private void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
+ {
+ try
+ {
+ TransportConnection.getInstance().connect(_protocolHandler, brokerDetail);
+ // this blocks until the connection has been set up or when an error
+ // has prevented the connection being set up
+ _protocolHandler.attainState(AMQState.CONNECTION_OPEN);
+ _failoverPolicy.attainedConnection();
+
+ //Again this should be changed to a suitable notify
+ _connected = true;
+ }
+ catch (AMQException e)
+ {
+ _lastAMQException = e;
+ throw e;
+ }
+ }
+
+ public boolean attemptReconnection(String host, int port, boolean useSSL)
+ {
+ BrokerDetails bd = new AMQBrokerDetails(host, port, useSSL);
+
+ _failoverPolicy.setBroker(bd);
+
+ try
+ {
+ makeBrokerConnection(bd);
+ return true;
+ }
+ catch (Exception e)
+ {
+ _logger.info("Unable to connect to broker at " + bd);
+ attemptReconnection();
+ }
+ return false;
+ }
+
+ public boolean attemptReconnection()
+ {
+ while (_failoverPolicy.failoverAllowed())
+ {
+ try
+ {
+ makeBrokerConnection(_failoverPolicy.getNextBrokerDetails());
+ return true;
+ }
+ catch (Exception e)
+ {
+ if (!(e instanceof AMQException))
+ {
+ _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e);
+ }
+ else
+ {
+ _logger.info(e.getMessage() + ":Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails());
+ }
+ }
+ }
+
+ //connection unsuccessful
+ return false;
+ }
+
+ /**
+ * Get the details of the currently active broker
+ *
+ * @return null if no broker is active (i.e. no successful connection has been made, or
+ * the BrokerDetail instance otherwise
+ */
+ public BrokerDetails getActiveBrokerDetails()
+ {
+ return _failoverPolicy.getCurrentBrokerDetails();
+ }
+
+ public boolean failoverAllowed()
+ {
+ return _failoverPolicy.failoverAllowed();
+ }
+
+ public Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException
+ {
+ return createSession(transacted, acknowledgeMode, AMQSession.DEFAULT_PREFETCH);
+ }
+
+ public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
+ final int prefetch) throws JMSException
+ {
+ checkNotClosed();
+ if (channelLimitReached())
+ {
+ throw new ChannelLimitReachedException(_maximumChannelCount);
+ }
+ else
+ {
+ return (org.apache.qpid.jms.Session) new FailoverSupport()
+ {
+ public Object operation() throws JMSException
+ {
+ int channelId = _idFactory.incrementAndGet();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Write channel open frame for channel id " + channelId);
+ }
+
+ // We must create the session and register it before actually sending the frame to the server to
+ // open it, so that there is no window where we could receive data on the channel and not be set
+ // up to handle it appropriately.
+ AMQSession session = new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode,
+ prefetch);
+ _protocolHandler.addSessionByChannel(channelId, session);
+ registerSession(channelId, session);
+
+ boolean success = false;
+ try
+ {
+ createChannelOverWire(channelId, prefetch, transacted);
+ success = true;
+ }
+ catch (AMQException e)
+ {
+ JMSException jmse = new JMSException("Error creating session: " + e);
+ jmse.setLinkedException(e);
+ throw jmse;
+ }
+ finally
+ {
+ if (!success) {
+ _protocolHandler.removeSessionByChannel(channelId);
+ deregisterSession(channelId);
+ }
+ }
+
+ if (_started)
+ {
+ session.start();
+ }
+ return session;
+ }
+ }.execute(this);
+ }
+ }
+
+ private void createChannelOverWire(int channelId, int prefetch, boolean transacted)
+ throws AMQException
+ {
+ _protocolHandler.syncWrite(
+ ChannelOpenBody.createAMQFrame(channelId, null), ChannelOpenOkBody.class);
+ _protocolHandler.syncWrite(
+ BasicQosBody.createAMQFrame(channelId, 0, prefetch, false),
+ BasicQosOkBody.class);
+
+ if (transacted)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Issuing TxSelect for " + channelId);
+ }
+ _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId), TxSelectOkBody.class);
+ }
+ }
+
+ private void reopenChannel(int channelId, int prefetch, boolean transacted) throws AMQException
+ {
+ try
+ {
+ createChannelOverWire(channelId, prefetch, transacted);
+ }
+ catch (AMQException e)
+ {
+ _protocolHandler.removeSessionByChannel(channelId);
+ deregisterSession(channelId);
+ throw new AMQException("Error reopening channel " + channelId + " after failover: " + e);
+ }
+ }
+
+
+ public void setFailoverPolicy(FailoverPolicy policy)
+ {
+ _failoverPolicy = policy;
+ }
+
+ public FailoverPolicy getFailoverPolicy()
+ {
+ return _failoverPolicy;
+ }
+
+ public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException
+ {
+ return (QueueSession) createSession(transacted, acknowledgeMode);
+ }
+
+ public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException
+ {
+ return (TopicSession) createSession(transacted, acknowledgeMode);
+ }
+
+ private boolean channelLimitReached()
+ {
+ return _maximumChannelCount != 0 && _sessions.size() == _maximumChannelCount;
+ }
+
+ public String getClientID() throws JMSException
+ {
+ checkNotClosed();
+ return _clientName;
+ }
+
+ public void setClientID(String clientID) throws JMSException
+ {
+ checkNotClosed();
+ _clientName = clientID;
+ }
+
+ public ConnectionMetaData getMetaData() throws JMSException
+ {
+ checkNotClosed();
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public ExceptionListener getExceptionListener() throws JMSException
+ {
+ checkNotClosed();
+ return _exceptionListener;
+ }
+
+ public void setExceptionListener(ExceptionListener listener) throws JMSException
+ {
+ checkNotClosed();
+ _exceptionListener = listener;
+ }
+
+ /**
+ * Start the connection, i.e. start flowing messages. Note that this method must be called only from a single thread
+ * and is not thread safe (which is legal according to the JMS specification).
+ *
+ * @throws JMSException
+ */
+ public void start() throws JMSException
+ {
+ checkNotClosed();
+ if (!_started)
+ {
+ final Iterator it = _sessions.entrySet().iterator();
+ while (it.hasNext())
+ {
+ final AMQSession s = (AMQSession) ((Map.Entry) it.next()).getValue();
+ s.start();
+ }
+ _started = true;
+ }
+ }
+
+ public void stop() throws JMSException
+ {
+ checkNotClosed();
+
+ if (_started)
+ {
+ for (Iterator i = _sessions.values().iterator(); i.hasNext();)
+ {
+ ((AMQSession) i.next()).stop();
+ }
+ _started = false;
+ }
+ }
+
+ public void close() throws JMSException
+ {
+ synchronized (getFailoverMutex())
+ {
+ if (!_closed.getAndSet(true))
+ {
+ try
+ {
+ closeAllSessions(null);
+ _protocolHandler.closeConnection();
+ }
+ catch (AMQException e)
+ {
+ throw new JMSException("Error closing connection: " + e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Marks all sessions and their children as closed without sending any protocol messages. Useful when
+ * you need to mark objects "visible" in userland as closed after failover or other significant event that
+ * impacts the connection.
+ * <p/>
+ * The caller must hold the failover mutex before calling this method.
+ */
+ private void markAllSessionsClosed()
+ {
+ final LinkedList sessionCopy = new LinkedList(_sessions.values());
+ final Iterator it = sessionCopy.iterator();
+ while (it.hasNext())
+ {
+ final AMQSession session = (AMQSession) it.next();
+
+ session.markClosed();
+ }
+ _sessions.clear();
+ }
+
+ /**
+ * Close all the sessions, either due to normal connection closure or due to an error occurring.
+ *
+ * @param cause if not null, the error that is causing this shutdown
+ * <p/>
+ * The caller must hold the failover mutex before calling this method.
+ */
+ private void closeAllSessions(Throwable cause) throws JMSException
+ {
+ final LinkedList sessionCopy = new LinkedList(_sessions.values());
+ final Iterator it = sessionCopy.iterator();
+ JMSException sessionException = null;
+ while (it.hasNext())
+ {
+ final AMQSession session = (AMQSession) it.next();
+ if (cause != null)
+ {
+ session.closed(cause);
+ }
+ else
+ {
+ try
+ {
+ session.close();
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error closing session: " + e);
+ sessionException = e;
+ }
+ }
+ }
+ _sessions.clear();
+ if (sessionException != null)
+ {
+ throw sessionException;
+ }
+ }
+
+ public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
+ ServerSessionPool sessionPool,
+ int maxMessages) throws JMSException
+ {
+ checkNotClosed();
+ return null;
+ }
+
+ public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector,
+ ServerSessionPool sessionPool,
+ int maxMessages) throws JMSException
+ {
+ checkNotClosed();
+ return null;
+ }
+
+ public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector,
+ ServerSessionPool sessionPool,
+ int maxMessages) throws JMSException
+ {
+ checkNotClosed();
+ return null;
+ }
+
+ public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName,
+ String messageSelector, ServerSessionPool sessionPool,
+ int maxMessages)
+ throws JMSException
+ {
+ // TODO Auto-generated method stub
+ checkNotClosed();
+ return null;
+ }
+
+ public long getMaximumChannelCount()
+ {
+ checkNotClosed();
+ return _maximumChannelCount;
+ }
+
+ public void setConnectionListener(ConnectionListener listener)
+ {
+ _connectionListener = listener;
+ }
+
+ public ConnectionListener getConnectionListener()
+ {
+ return _connectionListener;
+ }
+
+ public void setMaximumChannelCount(long maximumChannelCount)
+ {
+ checkNotClosed();
+ _maximumChannelCount = maximumChannelCount;
+ }
+
+ public void setMaximumFrameSize(long frameMax)
+ {
+ _maximumFrameSize = frameMax;
+ }
+
+ public long getMaximumFrameSize()
+ {
+ return _maximumFrameSize;
+ }
+
+ public Map getSessions()
+ {
+ return _sessions;
+ }
+
+ public String getUsername()
+ {
+ return _username;
+ }
+
+ public String getPassword()
+ {
+ return _password;
+ }
+
+ public String getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+ public AMQProtocolHandler getProtocolHandler()
+ {
+ return _protocolHandler;
+ }
+
+ public void bytesSent(long writtenBytes)
+ {
+ if (_connectionListener != null)
+ {
+ _connectionListener.bytesSent(writtenBytes);
+ }
+ }
+
+ public void bytesReceived(long receivedBytes)
+ {
+ if (_connectionListener != null)
+ {
+ _connectionListener.bytesReceived(receivedBytes);
+ }
+ }
+
+ /**
+ * Fire the preFailover event to the registered connection listener (if any)
+ *
+ * @param redirect true if this is the result of a redirect request rather than a connection error
+ * @return true if no listener or listener does not veto change
+ */
+ public boolean firePreFailover(boolean redirect)
+ {
+ boolean proceed = true;
+ if (_connectionListener != null)
+ {
+ proceed = _connectionListener.preFailover(redirect);
+ }
+ return proceed;
+ }
+
+ /**
+ * Fire the preResubscribe event to the registered connection listener (if any). If the listener
+ * vetoes resubscription then all the sessions are closed.
+ *
+ * @return true if no listener or listener does not veto resubscription.
+ * @throws JMSException
+ */
+ public boolean firePreResubscribe() throws JMSException
+ {
+ if (_connectionListener != null)
+ {
+ boolean resubscribe = _connectionListener.preResubscribe();
+ if (!resubscribe)
+ {
+ markAllSessionsClosed();
+ }
+ return resubscribe;
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ /**
+ * Fires a failover complete event to the registered connection listener (if any).
+ */
+ public void fireFailoverComplete()
+ {
+ if (_connectionListener != null)
+ {
+ _connectionListener.failoverComplete();
+ }
+ }
+
+ /**
+ * In order to protect the consistency of the connection and its child sessions, consumers and producers,
+ * the "failover mutex" must be held when doing any operations that could be corrupted during failover.
+ *
+ * @return a mutex. Guaranteed never to change for the lifetime of this connection even if failover occurs.
+ */
+ public final Object getFailoverMutex()
+ {
+ return _failoverMutex;
+ }
+
+ /**
+ * If failover is taking place this will block until it has completed. If failover
+ * is not taking place it will return immediately.
+ *
+ * @throws InterruptedException
+ */
+ public void blockUntilNotFailingOver() throws InterruptedException
+ {
+ _protocolHandler.blockUntilNotFailingOver();
+ }
+
+ /**
+ * Invoked by the AMQProtocolSession when a protocol session exception has occurred.
+ * This method sends the exception to a JMS exception listener, if configured, and
+ * propagates the exception to sessions, which in turn will propagate to consumers.
+ * This allows synchronous consumers to have exceptions thrown to them.
+ *
+ * @param cause the exception
+ */
+ public void exceptionReceived(Throwable cause)
+ {
+
+ _logger.debug("Connection Close done by:" + Thread.currentThread().getName());
+ _logger.debug("exceptionReceived is ", cause);
+
+ final JMSException je;
+ if (cause instanceof JMSException)
+ {
+ je = (JMSException) cause;
+ }
+ else
+ {
+ je = new JMSException("Exception thrown against " + toString() + ": " + cause);
+ if (cause instanceof Exception)
+ {
+ je.setLinkedException((Exception) cause);
+ }
+ }
+
+ // in the case of an IOException, MINA has closed the protocol session so we set _closed to true
+ // so that any generic client code that tries to close the connection will not mess up this error
+ // handling sequence
+ if (cause instanceof IOException)
+ {
+ _closed.set(true);
+ }
+
+ if (_exceptionListener != null)
+ {
+ _exceptionListener.onException(je);
+ }
+
+ if (!(cause instanceof AMQUndeliveredException) && !(cause instanceof AMQAuthenticationException))
+ {
+ try
+ {
+ _logger.info("Closing AMQConnection due to :" + cause.getMessage());
+ _closed.set(true);
+ closeAllSessions(cause); // FIXME: when doing this end up with RejectedExecutionException from executor.
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error closing all sessions: " + e, e);
+ }
+
+ }
+ else
+ {
+ _logger.info("Not a hard-error connection not closing.");
+ }
+ }
+
+ void registerSession(int channelId, AMQSession session)
+ {
+ _sessions.put(channelId, session);
+ }
+
+ void deregisterSession(int channelId)
+ {
+ _sessions.remove(channelId);
+ }
+
+ /**
+ * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling.
+ * The caller must hold the failover mutex before calling this method.
+ */
+ public void resubscribeSessions() throws AMQException
+ {
+ ArrayList sessions = new ArrayList(_sessions.values());
+ _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: remove?
+ for (Iterator it = sessions.iterator(); it.hasNext();)
+ {
+ AMQSession s = (AMQSession) it.next();
+ _protocolHandler.addSessionByChannel(s.getChannelId(), s);
+ reopenChannel(s.getChannelId(), s.getDefaultPrefetch(), s.getTransacted());
+ s.resubscribe();
+ }
+ }
+
+ public String toString()
+ {
+ StringBuffer buf = new StringBuffer("AMQConnection:\n");
+ if (_failoverPolicy.getCurrentBrokerDetails() == null)
+ {
+ buf.append("No active broker connection");
+ }
+ else
+ {
+ BrokerDetails bd = _failoverPolicy.getCurrentBrokerDetails();
+ buf.append("Host: ").append(String.valueOf(bd.getHost()));
+ buf.append("\nPort: ").append(String.valueOf(bd.getPort()));
+ }
+ buf.append("\nVirtual Host: ").append(String.valueOf(_virtualHost));
+ buf.append("\nClient ID: ").append(String.valueOf(_clientName));
+ buf.append("\nActive session count: ").append(_sessions == null ? 0 : _sessions.size());
+ return buf.toString();
+ }
+
+ public String toURL()
+ {
+ return _connectionURL.toString();
+ }
+
+ public Reference getReference() throws NamingException
+ {
+ return new Reference(
+ AMQConnection.class.getName(),
+ new StringRefAddr(AMQConnection.class.getName(), toURL()),
+ AMQConnectionFactory.class.getName(),
+ null); // factory location
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnection.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnectionFactory.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnectionFactory.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnectionFactory.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,358 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.jms.ConnectionURL;
+
+import javax.jms.*;
+import javax.jms.Connection;
+import javax.naming.*;
+import javax.naming.spi.ObjectFactory;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Hashtable;
+
+
+public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, ObjectFactory, Referenceable
+{
+ private String _host;
+ private int _port;
+ private String _defaultUsername;
+ private String _defaultPassword;
+ private String _virtualPath;
+
+ private ConnectionURL _connectionDetails;
+
+
+ public AMQConnectionFactory()
+ {
+ }
+
+ public AMQConnectionFactory(String url) throws URLSyntaxException
+ {
+ _connectionDetails = new AMQConnectionURL(url);
+ }
+
+ public AMQConnectionFactory(ConnectionURL url)
+ {
+ _connectionDetails = url;
+ }
+
+ public AMQConnectionFactory(String broker, String username, String password,
+ String clientName, String virtualHost) throws URLSyntaxException
+ {
+ this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" +
+ username + ":" + password + "@" + clientName +
+ virtualHost + "?brokerlist='" + broker + "'"));
+ }
+
+ public AMQConnectionFactory(String host, int port, String virtualPath)
+ {
+ this(host, port, "guest", "guest", virtualPath);
+ }
+
+ public AMQConnectionFactory(String host, int port, String defaultUsername, String defaultPassword,
+ String virtualPath)
+ {
+ _host = host;
+ _port = port;
+ _defaultUsername = defaultUsername;
+ _defaultPassword = defaultPassword;
+ _virtualPath = virtualPath;
+
+//todo when setting Host/Port has been resolved then we can use this otherwise those methods won't work with the following line.
+// _connectionDetails = new AMQConnectionURL(
+// ConnectionURL.AMQ_PROTOCOL + "://" +
+// _defaultUsername + ":" + _defaultPassword + "@" +
+// virtualPath + "?brokerlist='tcp://" + host + ":" + port + "'");
+ }
+
+ /**
+ * @return The _defaultPassword.
+ */
+ public final String getDefaultPassword(String password)
+ {
+ if (_connectionDetails != null)
+ {
+ return _connectionDetails.getPassword();
+ }
+ else
+ {
+ return _defaultPassword;
+ }
+ }
+
+ /**
+ * @param password The _defaultPassword to set.
+ */
+ public final void setDefaultPassword(String password)
+ {
+ if (_connectionDetails != null)
+ {
+ _connectionDetails.setPassword(password);
+ }
+ _defaultPassword = password;
+ }
+
+ /**
+ * @return The _defaultPassword.
+ */
+ public final String getDefaultUsername(String password)
+ {
+ if (_connectionDetails != null)
+ {
+ return _connectionDetails.getUsername();
+ }
+ else
+ {
+ return _defaultUsername;
+ }
+ }
+
+ /**
+ * @param username The _defaultUsername to set.
+ */
+ public final void setDefaultUsername(String username)
+ {
+ if (_connectionDetails != null)
+ {
+ _connectionDetails.setUsername(username);
+ }
+ _defaultUsername = username;
+ }
+
+ /**
+ * @return The _host .
+ */
+ public final String getHost()
+ {
+ //todo this doesn't make sense in a multi broker URL as we have no current as that is done by AMQConnection
+ return _host;
+ }
+
+ /**
+ * @param host The _host to set.
+ */
+ public final void setHost(String host)
+ {
+ //todo if _connectionDetails is set then run _connectionDetails.addBrokerDetails()
+ // Should perhaps have this method changed to setBroker(host,port)
+ _host = host;
+ }
+
+ /**
+ * @return _port The _port to set.
+ */
+ public final int getPort()
+ {
+ //todo see getHost
+ return _port;
+ }
+
+ /**
+ * @param port The port to set.
+ */
+ public final void setPort(int port)
+ {
+ //todo see setHost
+ _port = port;
+ }
+
+ /**
+ * @return he _virtualPath.
+ */
+ public final String getVirtualPath()
+ {
+ if (_connectionDetails != null)
+ {
+ return _connectionDetails.getVirtualHost();
+ }
+ else
+ {
+ return _virtualPath;
+ }
+ }
+
+ /**
+ * @param path The _virtualPath to set.
+ */
+ public final void setVirtualPath(String path)
+ {
+ if (_connectionDetails != null)
+ {
+ _connectionDetails.setVirtualHost(path);
+ }
+
+ _virtualPath = path;
+ }
+
+ static String getUniqueClientID()
+ {
+ try
+ {
+ InetAddress addr = InetAddress.getLocalHost();
+ return addr.getHostName() + System.currentTimeMillis();
+ }
+ catch (UnknownHostException e)
+ {
+ return null;
+ }
+ }
+
+ public Connection createConnection() throws JMSException
+ {
+ try
+ {
+ if (_connectionDetails != null)
+ {
+ if (_connectionDetails.getClientName() == null || _connectionDetails.getClientName().equals(""))
+ {
+ _connectionDetails.setClientName(getUniqueClientID());
+ }
+ return new AMQConnection(_connectionDetails);
+ }
+ else
+ {
+ return new AMQConnection(_host, _port, _defaultUsername, _defaultPassword, getUniqueClientID(),
+ _virtualPath);
+ }
+ }
+ catch (Exception e)
+ {
+ JMSException jmse = new JMSException("Error creating connection: " + e.getMessage());
+ jmse.setLinkedException(e);
+ throw jmse;
+ }
+
+
+ }
+
+ public Connection createConnection(String userName, String password) throws JMSException
+ {
+ try
+ {
+ return new AMQConnection(_host, _port, userName, password, getUniqueClientID(), _virtualPath);
+ }
+ catch (Exception e)
+ {
+ JMSException jmse = new JMSException("Error creating connection: " + e.getMessage());
+ jmse.setLinkedException(e);
+ throw jmse;
+ }
+ }
+
+ public QueueConnection createQueueConnection() throws JMSException
+ {
+ return (QueueConnection) createConnection();
+ }
+
+ public QueueConnection createQueueConnection(String username, String password) throws JMSException
+ {
+ return (QueueConnection) createConnection(username, password);
+ }
+
+ public TopicConnection createTopicConnection() throws JMSException
+ {
+ return (TopicConnection) createConnection();
+ }
+
+ public TopicConnection createTopicConnection(String username, String password) throws JMSException
+ {
+ return (TopicConnection) createConnection(username, password);
+ }
+
+
+ public ConnectionURL getConnectionURL()
+ {
+ return _connectionDetails;
+ }
+
+ /**
+ * JNDI interface to create objects from References.
+ *
+ * @param obj The Reference from JNDI
+ * @param name
+ * @param ctx
+ * @param env
+ * @return AMQConnection,AMQTopic,AMQQueue, or AMQConnectionFactory.
+ * @throws Exception
+ */
+ public Object getObjectInstance(Object obj, Name name, Context ctx,
+ Hashtable env) throws Exception
+ {
+ if (obj instanceof Reference)
+ {
+ Reference ref = (Reference) obj;
+
+ if (ref.getClassName().equals(AMQConnection.class.getName()))
+ {
+ RefAddr addr = ref.get(AMQConnection.class.getName());
+
+ if (addr != null)
+ {
+ return new AMQConnection((String) addr.getContent());
+ }
+ }
+
+ if (ref.getClassName().equals(AMQQueue.class.getName()))
+ {
+ RefAddr addr = ref.get(AMQQueue.class.getName());
+
+ if (addr != null)
+ {
+ return new AMQQueue(new AMQBindingURL((String) addr.getContent()).getQueueName());
+ }
+ }
+
+ if (ref.getClassName().equals(AMQTopic.class.getName()))
+ {
+ RefAddr addr = ref.get(AMQTopic.class.getName());
+
+ if (addr != null)
+ {
+ return new AMQTopic(new AMQBindingURL((String) addr.getContent()).getDestinationName());
+ }
+ }
+
+ if (ref.getClassName().equals(AMQConnectionFactory.class.getName()))
+ {
+ RefAddr addr = ref.get(AMQConnectionFactory.class.getName());
+
+ if (addr != null)
+ {
+ return new AMQConnectionFactory(new AMQConnectionURL((String) addr.getContent()));
+ }
+ }
+
+ }
+ return null;
+ }
+
+
+ public Reference getReference() throws NamingException
+ {
+ return new Reference(
+ AMQConnectionFactory.class.getName(),
+ new StringRefAddr(AMQConnectionFactory.class.getName(), _connectionDetails.getURL()),
+ AMQConnectionFactory.class.getName(),
+ null); // factory location
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnectionFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnectionURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnectionURL.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnectionURL.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnectionURL.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,399 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.url.URLHelper;
+import org.apache.qpid.url.URLSyntaxException;
+
+import java.util.*;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+public class AMQConnectionURL implements ConnectionURL
+{
+ private String _url;
+ private String _failoverMethod;
+ private HashMap<String, String> _failoverOptions;
+ private HashMap<String, String> _options;
+ private List<BrokerDetails> _brokers;
+ private String _clientName;
+ private String _username;
+ private String _password;
+ private String _virtualHost;
+
+ public AMQConnectionURL(String fullURL) throws URLSyntaxException
+ {
+ _url = fullURL;
+ _options = new HashMap<String, String>();
+ _brokers = new LinkedList<BrokerDetails>();
+ _failoverOptions = new HashMap<String, String>();
+
+ // Connection URL format
+ //amqp://[user:pass@][clientid]/virtualhost?brokerlist='tcp://host:port?option=\'value\',option=\'value\';vm://:3/virtualpath?option=\'value\'',failover='method?option=\'value\',option='value''"
+ // Options are of course optional except for requiring a single broker in the broker list.
+ try
+ {
+ URI connection = new URI(fullURL);
+
+ if (connection.getScheme() == null || !(connection.getScheme().equalsIgnoreCase(AMQ_PROTOCOL)))
+ {
+ throw new URISyntaxException(fullURL, "Not an AMQP URL");
+ }
+
+ if (connection.getHost() == null || connection.getHost().equals(""))
+ {
+ String uid = AMQConnectionFactory.getUniqueClientID();
+ if (uid == null)
+ {
+ URLHelper.parseError(-1, "Client Name not specified", fullURL);
+ }
+ else
+ {
+ setClientName(uid);
+ }
+
+ }
+ else
+ {
+ setClientName(connection.getHost());
+ }
+
+ String userInfo = connection.getUserInfo();
+
+ if (userInfo == null)
+ {
+ //Fix for Java 1.5 which doesn't parse UserInfo for non http URIs
+ userInfo = connection.getAuthority();
+
+ if (userInfo != null)
+ {
+ int atIndex = userInfo.indexOf('@');
+
+ if (atIndex != -1)
+ {
+ userInfo = userInfo.substring(0, atIndex);
+ }
+ else
+ {
+ userInfo = null;
+ }
+ }
+
+ }
+
+ if (userInfo == null)
+ {
+ URLHelper.parseError(AMQ_PROTOCOL.length() + 3,
+ "User information not found on url", fullURL);
+ }
+ else
+ {
+ parseUserInfo(userInfo);
+ }
+ String virtualHost = connection.getPath();
+
+ if (virtualHost != null && (!virtualHost.equals("")))
+ {
+ setVirtualHost(virtualHost);
+ }
+ else
+ {
+ int authLength = connection.getAuthority().length();
+ int start = AMQ_PROTOCOL.length() + 3;
+ int testIndex = start + authLength;
+ if (testIndex < fullURL.length() && fullURL.charAt(testIndex) == '?')
+ {
+ URLHelper.parseError(start, testIndex - start, "Virtual host found", fullURL);
+ }
+ else
+ {
+ URLHelper.parseError(-1, "Virtual host not specified", fullURL);
+ }
+
+ }
+
+
+ URLHelper.parseOptions(_options, connection.getQuery());
+
+ processOptions();
+
+ //Fragment is #string (not used)
+ //System.out.println(connection.getFragment());
+
+ }
+ catch (URISyntaxException uris)
+ {
+ if (uris instanceof URLSyntaxException)
+ {
+ throw (URLSyntaxException) uris;
+ }
+
+ int slash = fullURL.indexOf("\\");
+
+ if (slash == -1)
+ {
+ URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
+ }
+ else
+ {
+ if (slash != 0 && fullURL.charAt(slash - 1) == ':')
+ {
+ URLHelper.parseError(slash - 2, fullURL.indexOf('?') - slash + 2, "Virtual host looks like a windows path, forward slash not allowed in URL", fullURL);
+ }
+ else
+ {
+ URLHelper.parseError(slash, "Forward slash not allowed in URL", fullURL);
+ }
+ }
+
+ }
+ }
+
+ private void parseUserInfo(String userinfo) throws URLSyntaxException
+ {
+ //user info = user:pass
+
+ int colonIndex = userinfo.indexOf(':');
+
+ if (colonIndex == -1)
+ {
+ URLHelper.parseError(AMQ_PROTOCOL.length() + 3, userinfo.length(),
+ "Null password in user information not allowed.", _url);
+ }
+ else
+ {
+ setUsername(userinfo.substring(0, colonIndex));
+ setPassword(userinfo.substring(colonIndex + 1));
+ }
+
+ }
+
+ private void processOptions() throws URLSyntaxException
+ {
+ if (_options.containsKey(OPTIONS_BROKERLIST))
+ {
+ String brokerlist = _options.get(OPTIONS_BROKERLIST);
+
+ //brokerlist tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'
+ StringTokenizer st = new StringTokenizer(brokerlist, "" + URLHelper.BROKER_SEPARATOR);
+
+ while (st.hasMoreTokens())
+ {
+ String broker = st.nextToken();
+
+ _brokers.add(new AMQBrokerDetails(broker));
+ }
+
+ _options.remove(OPTIONS_BROKERLIST);
+ }
+
+ if (_options.containsKey(OPTIONS_FAILOVER))
+ {
+ String failover = _options.get(OPTIONS_FAILOVER);
+
+ // failover='method?option='value',option='value''
+
+ int methodIndex = failover.indexOf('?');
+
+ if (methodIndex > -1)
+ {
+ _failoverMethod = failover.substring(0, methodIndex);
+ URLHelper.parseOptions(_failoverOptions, failover.substring(methodIndex + 1));
+ }
+ else
+ {
+ _failoverMethod = failover;
+ }
+
+ _options.remove(OPTIONS_FAILOVER);
+ }
+ }
+
+ public String getURL()
+ {
+ return _url;
+ }
+
+ public String getFailoverMethod()
+ {
+ return _failoverMethod;
+ }
+
+ public String getFailoverOption(String key)
+ {
+ return _failoverOptions.get(key);
+ }
+
+ public void setFailoverOption(String key, String value)
+ {
+ _failoverOptions.put(key, value);
+ }
+
+ public int getBrokerCount()
+ {
+ return _brokers.size();
+ }
+
+ public BrokerDetails getBrokerDetails(int index)
+ {
+ if (index < _brokers.size())
+ {
+ return _brokers.get(index);
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public void addBrokerDetails(BrokerDetails broker)
+ {
+ if (!(_brokers.contains(broker)))
+ {
+ _brokers.add(broker);
+ }
+ }
+
+ public List<BrokerDetails> getAllBrokerDetails()
+ {
+ return _brokers;
+ }
+
+ public String getClientName()
+ {
+ return _clientName;
+ }
+
+ public void setClientName(String clientName)
+ {
+ _clientName = clientName;
+ }
+
+ public String getUsername()
+ {
+ return _username;
+ }
+
+ public void setUsername(String username)
+ {
+ _username = username;
+ }
+
+ public String getPassword()
+ {
+ return _password;
+ }
+
+ public void setPassword(String password)
+ {
+ _password = password;
+ }
+
+ public String getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+ public void setVirtualHost(String virtuaHost)
+ {
+ _virtualHost = virtuaHost;
+ }
+
+ public String getOption(String key)
+ {
+ return _options.get(key);
+ }
+
+ public void setOption(String key, String value)
+ {
+ _options.put(key, value);
+ }
+
+ public String toString()
+ {
+ StringBuffer sb = new StringBuffer();
+
+ sb.append(AMQ_PROTOCOL);
+ sb.append("://");
+
+ if (_username != null)
+ {
+ sb.append(_username);
+
+ if (_password != null)
+ {
+ sb.append(':');
+ sb.append(_password);
+ }
+
+ sb.append('@');
+ }
+
+ sb.append(_clientName);
+
+ sb.append(_virtualHost);
+
+ sb.append(optionsToString());
+
+ return sb.toString();
+ }
+
+ private String optionsToString()
+ {
+ StringBuffer sb = new StringBuffer();
+
+ sb.append("?" + OPTIONS_BROKERLIST + "='");
+
+ for (BrokerDetails service : _brokers)
+ {
+ sb.append(service.toString());
+ sb.append(';');
+ }
+
+ sb.deleteCharAt(sb.length() - 1);
+ sb.append("'");
+
+ if (_failoverMethod != null)
+ {
+ sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ sb.append(OPTIONS_FAILOVER + "='");
+ sb.append(_failoverMethod);
+ sb.append(URLHelper.printOptions(_failoverOptions));
+ sb.append("'");
+ }
+
+ return sb.toString();
+ }
+
+
+ public static void main(String[] args) throws URLSyntaxException
+ {
+
+ String url2 = "amqp://ritchiem:bob@temp?brokerlist='tcp://localhost:5672;jcp://fancyserver:3000/',failover='roundrobin'";
+ //"amqp://user:pass@clientid/virtualhost?brokerlist='tcp://host:1?option1=\'value\',option2=\'value\';vm://:3?option1=\'value\'',failover='method?option1=\'value\',option2='value''";
+
+ ConnectionURL connectionurl2 = new AMQConnectionURL(url2);
+
+ System.out.println(url2);
+ System.out.println(connectionurl2);
+
+ }
+
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnectionURL.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQDestination.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQDestination.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQDestination.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,282 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.url.BindingURL;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.url.URLHelper;
+import org.apache.qpid.exchange.ExchangeDefaults;
+
+import javax.naming.Reference;
+import javax.naming.NamingException;
+import javax.naming.StringRefAddr;
+import javax.naming.Referenceable;
+import javax.jms.Destination;
+
+
+public abstract class AMQDestination implements Destination, Referenceable
+{
+ protected final String _exchangeName;
+
+ protected final String _exchangeClass;
+
+ protected final String _destinationName;
+
+ protected boolean _isDurable;
+
+ protected final boolean _isExclusive;
+
+ protected final boolean _isAutoDelete;
+
+ protected String _queueName;
+
+ protected AMQDestination(String url) throws URLSyntaxException
+ {
+ this(new AMQBindingURL(url));
+ }
+
+ protected AMQDestination(BindingURL binding)
+ {
+ _exchangeName = binding.getExchangeName();
+ _exchangeClass = binding.getExchangeClass();
+ _destinationName = binding.getDestinationName();
+
+ _isExclusive = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCLUSIVE));
+ _isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE));
+ _isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE));
+ _queueName = binding.getQueueName();
+ }
+
+ protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, String queueName)
+ {
+ this(exchangeName, exchangeClass, destinationName, false, false, queueName);
+ }
+
+ protected AMQDestination(String exchangeName, String exchangeClass, String destinationName)
+ {
+ this(exchangeName, exchangeClass, destinationName, false, false, null);
+ }
+
+ protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, boolean isExclusive,
+ boolean isAutoDelete, String queueName)
+ {
+ if (destinationName == null)
+ {
+ throw new IllegalArgumentException("Destination name must not be null");
+ }
+ if (exchangeName == null)
+ {
+ throw new IllegalArgumentException("Exchange name must not be null");
+ }
+ if (exchangeClass == null)
+ {
+ throw new IllegalArgumentException("Exchange class must not be null");
+ }
+ _exchangeName = exchangeName;
+ _exchangeClass = exchangeClass;
+ _destinationName = destinationName;
+ _isExclusive = isExclusive;
+ _isAutoDelete = isAutoDelete;
+ _queueName = queueName;
+ }
+
+ public abstract String getEncodedName();
+
+ public boolean isDurable()
+ {
+ return _isDurable;
+ }
+
+ public String getExchangeName()
+ {
+ return _exchangeName;
+ }
+
+ public String getExchangeClass()
+ {
+ return _exchangeClass;
+ }
+
+ public boolean isTopic()
+ {
+ return ExchangeDefaults.TOPIC_EXCHANGE_NAME.equals(_exchangeName);
+ }
+
+ public boolean isQueue()
+ {
+ return ExchangeDefaults.DIRECT_EXCHANGE_NAME.equals(_exchangeName);
+ }
+
+ public String getDestinationName()
+ {
+ return _destinationName;
+ }
+
+ public String getQueueName()
+ {
+ return _queueName;
+ }
+
+ public void setQueueName(String queueName)
+ {
+ _queueName = queueName;
+ }
+
+ public abstract String getRoutingKey();
+
+ public boolean isExclusive()
+ {
+ return _isExclusive;
+ }
+
+ public boolean isAutoDelete()
+ {
+ return _isAutoDelete;
+ }
+
+ public abstract boolean isNameRequired();
+
+ public String toString()
+ {
+ return toURL();
+
+ /*
+ return "Destination: " + _destinationName + ", " +
+ "Queue Name: " + _queueName + ", Exchange: " + _exchangeName +
+ ", Exchange class: " + _exchangeClass + ", Exclusive: " + _isExclusive +
+ ", AutoDelete: " + _isAutoDelete + ", Routing Key: " + getRoutingKey();
+ */
+ }
+
+ public String toURL()
+ {
+ StringBuffer sb = new StringBuffer();
+
+ sb.append(_exchangeClass);
+ sb.append("://");
+ sb.append(_exchangeName);
+
+ sb.append("/");
+
+ if (_destinationName != null)
+ {
+ sb.append(_destinationName);
+ }
+
+ sb.append("/");
+
+ if (_queueName != null)
+ {
+ sb.append(_queueName);
+ }
+
+ sb.append("?");
+
+ if (_isDurable)
+ {
+ sb.append(BindingURL.OPTION_DURABLE);
+ sb.append("='true'");
+ sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ }
+
+ if (_isExclusive)
+ {
+ sb.append(BindingURL.OPTION_EXCLUSIVE);
+ sb.append("='true'");
+ sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ }
+
+ if (_isAutoDelete)
+ {
+ sb.append(BindingURL.OPTION_AUTODELETE);
+ sb.append("='true'");
+ sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ }
+
+ //remove the last char '?' if there is no options , ',' if there are.
+ sb.deleteCharAt(sb.length() - 1);
+
+ return sb.toString();
+ }
+
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final AMQDestination that = (AMQDestination) o;
+
+ if (!_destinationName.equals(that._destinationName))
+ {
+ return false;
+ }
+ if (!_exchangeClass.equals(that._exchangeClass))
+ {
+ return false;
+ }
+ if (!_exchangeName.equals(that._exchangeName))
+ {
+ return false;
+ }
+ if ((_queueName == null && that._queueName != null) ||
+ (_queueName != null && !_queueName.equals(that._queueName)))
+ {
+ return false;
+ }
+ if (_isExclusive != that._isExclusive)
+ {
+ return false;
+ }
+ if (_isAutoDelete != that._isAutoDelete)
+ {
+ return false;
+ }
+ return true;
+ }
+
+ public int hashCode()
+ {
+ int result;
+ result = _exchangeName.hashCode();
+ result = 29 * result + _exchangeClass.hashCode();
+ result = 29 * result + _destinationName.hashCode();
+ if (_queueName != null)
+ {
+ result = 29 * result + _queueName.hashCode();
+ }
+ result = result * (_isExclusive ? 13 : 7);
+ result = result * (_isAutoDelete ? 13 : 7);
+ return result;
+ }
+
+ public Reference getReference() throws NamingException
+ {
+ return new Reference(
+ this.getClass().getName(),
+ new StringRefAddr(this.getClass().getName(), toURL()),
+ AMQConnectionFactory.class.getName(),
+ null); // factory location
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQDestination.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQHeadersExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQHeadersExchange.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQHeadersExchange.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQHeadersExchange.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,49 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.exchange.ExchangeDefaults;
+
+/**
+ * A destination backed by a headers exchange
+ */
+public class AMQHeadersExchange extends AMQDestination
+{
+ public AMQHeadersExchange(String queueName)
+ {
+ super(queueName, ExchangeDefaults.HEADERS_EXCHANGE_CLASS, queueName, true, true, null);
+ }
+
+ public String getEncodedName()
+ {
+ return getDestinationName();
+ }
+
+ public String getRoutingKey()
+ {
+ return getDestinationName();
+ }
+
+ public boolean isNameRequired()
+ {
+ //Not sure what the best approach is here, probably to treat this like a topic
+ //and allow server to generate names. As it is AMQ specific it doesn't need to
+ //fit the JMS API expectations so this is not as yet critical.
+ return false;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQHeadersExchange.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQNoConsumersException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQNoConsumersException.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQNoConsumersException.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQNoConsumersException.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,34 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.AMQUndeliveredException;
+import org.apache.qpid.protocol.AMQConstant;
+
+
+public class AMQNoConsumersException extends AMQUndeliveredException
+{
+ public AMQNoConsumersException(String msg, Object bounced)
+ {
+ super(AMQConstant.NO_CONSUMERS.getCode(), msg, bounced);
+ }
+
+
+}
+
+
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQNoConsumersException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQNoRouteException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQNoRouteException.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQNoRouteException.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQNoRouteException.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,34 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.AMQUndeliveredException;
+import org.apache.qpid.protocol.AMQConstant;
+
+
+public class AMQNoRouteException extends AMQUndeliveredException
+{
+ public AMQNoRouteException(String msg, Object bounced)
+ {
+ super(AMQConstant.NO_ROUTE.getCode(), msg, bounced);
+ }
+
+
+}
+
+
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQNoRouteException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQQueue.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQQueue.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQQueue.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,91 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.url.BindingURL;
+import org.apache.qpid.exchange.ExchangeDefaults;
+
+import javax.jms.Queue;
+
+public class AMQQueue extends AMQDestination implements Queue
+{
+
+ /**
+ * Create a reference to a non temporary queue using a BindingURL object.
+ * Note this does not actually imply the queue exists.
+ * @param binding a BindingURL object
+ */
+ public AMQQueue(BindingURL binding)
+ {
+ super(binding);
+ }
+
+ /**
+ * Create a reference to a non temporary queue. Note this does not actually imply the queue exists.
+ * @param name the name of the queue
+ */
+ public AMQQueue(String name)
+ {
+ this(name, false);
+ }
+
+ /**
+ * Create a queue with a specified name.
+ *
+ * @param name the destination name (used in the routing key)
+ * @param temporary if true the broker will generate a queue name, also if true then the queue is autodeleted
+ * and exclusive
+ */
+ public AMQQueue(String name, boolean temporary)
+ {
+ // queue name is set to null indicating that the broker assigns a name in the case of temporary queues
+ // temporary queues are typically used as response queues
+ this(name, temporary?null:name, temporary, temporary);
+ _isDurable = !temporary;
+ }
+
+ /**
+ * Create a reference to a queue. Note this does not actually imply the queue exists.
+ * @param destinationName the queue name
+ * @param queueName the queue name
+ * @param exclusive true if the queue should only permit a single consumer
+ * @param autoDelete true if the queue should be deleted automatically when the last consumers detaches
+ */
+ public AMQQueue(String destinationName, String queueName, boolean exclusive, boolean autoDelete)
+ {
+ super(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, destinationName, exclusive,
+ autoDelete, queueName);
+ }
+
+ public String getEncodedName()
+ {
+ return 'Q' + getQueueName();
+ }
+
+ public String getRoutingKey()
+ {
+ return getQueueName();
+ }
+
+ public boolean isNameRequired()
+ {
+ //If the name is null, we require one to be generated by the client so that it will#
+ //remain valid if we failover (see BLZ-24)
+ return getQueueName() == null;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQQueue.java
------------------------------------------------------------------------------
svn:eol-style = native