You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/02 23:29:35 UTC

svn commit: r781177 [3/11] - in /activemq/sandbox/activemq-flow: activemq-bio/ activemq-bio/src/main/java/org/ activemq-bio/src/main/java/org/apache/ activemq-bio/src/main/java/org/apache/activemq/ activemq-bio/src/main/java/org/apache/activemq/transpo...

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,2222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionConsumer;
+import javax.jms.ConnectionMetaData;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+import javax.jms.XAConnection;
+import javax.jms.InvalidDestinationException;
+
+import org.apache.activemq.blob.BlobTransferPolicy;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTempDestination;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.ActiveMQTempTopic;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.CommandTypes;
+import org.apache.activemq.command.ConnectionControl;
+import org.apache.activemq.command.ConnectionError;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerControl;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.ControlCommand;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerAck;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.management.JMSConnectionStatsImpl;
+import org.apache.activemq.management.JMSStatsImpl;
+import org.apache.activemq.management.StatsCapable;
+import org.apache.activemq.management.StatsImpl;
+import org.apache.activemq.state.CommandVisitorAdapter;
+import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.activemq.util.LongSequenceGenerator;
+import org.apache.activemq.util.ServiceSupport;
+import org.apache.activemq.advisory.DestinationSource;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener, EnhancedConnection, IConnection {
+
+    public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
+    public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
+    public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
+
+    private static final Log LOG = LogFactory.getLog(ActiveMQConnection.class);
+    private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
+
+    public final ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>();
+
+    protected boolean dispatchAsync=true;
+    protected boolean alwaysSessionAsync = true;
+
+    private TaskRunnerFactory sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000);
+    private final ThreadPoolExecutor asyncConnectionThread;
+
+    // Connection state variables
+    private final ConnectionInfo info;
+    private ExceptionListener exceptionListener;
+    private ClientInternalExceptionListener clientInternalExceptionListener;
+    private boolean clientIDSet;
+    private boolean isConnectionInfoSentToBroker;
+    private boolean userSpecifiedClientID;
+
+    // Configuration options variables
+    private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
+    private BlobTransferPolicy blobTransferPolicy;
+    private RedeliveryPolicy redeliveryPolicy;
+    private MessageTransformer transformer;
+
+    private boolean disableTimeStampsByDefault;
+    private boolean optimizedMessageDispatch = true;
+    private boolean copyMessageOnSend = true;
+    private boolean useCompression;
+    private boolean objectMessageSerializationDefered;
+    private boolean useAsyncSend;
+    private boolean optimizeAcknowledge;
+    private boolean nestedMapAndListEnabled = true;
+    private boolean useRetroactiveConsumer;
+    private boolean exclusiveConsumer;
+    private boolean alwaysSyncSend;
+    private int closeTimeout = 15000;
+    private boolean watchTopicAdvisories = true;
+    private long warnAboutUnstartedConnectionTimeout = 500L;
+    private int sendTimeout =0;
+    private boolean sendAcksAsync=true;
+
+    private final Transport transport;
+    private final IdGenerator clientIdGenerator;
+    private final JMSStatsImpl factoryStats;
+    private final JMSConnectionStatsImpl stats;
+
+    private final AtomicBoolean started = new AtomicBoolean(false);
+    private final AtomicBoolean closing = new AtomicBoolean(false);
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final AtomicBoolean transportFailed = new AtomicBoolean(false);
+    private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>();
+    private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>();
+    private final CopyOnWriteArrayList<ActiveMQInputStream> inputStreams = new CopyOnWriteArrayList<ActiveMQInputStream>();
+    private final CopyOnWriteArrayList<ActiveMQOutputStream> outputStreams = new CopyOnWriteArrayList<ActiveMQOutputStream>();
+    private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>();
+
+    // Maps ConsumerIds to ActiveMQConsumer objects
+    private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>();
+    private final ConcurrentHashMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>();
+    private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
+    private final SessionId connectionSessionId;
+    private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
+    private final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
+    private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator();
+    private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator();
+
+    private AdvisoryConsumer advisoryConsumer;
+    private final CountDownLatch brokerInfoReceived = new CountDownLatch(1);
+    private BrokerInfo brokerInfo;
+    private IOException firstFailureError;
+    private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE;
+
+    // Assume that protocol is the latest. Change to the actual protocol
+    // version when a WireFormatInfo is received.
+    private AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
+    private long timeCreated;
+    private ConnectionAudit connectionAudit = new ConnectionAudit();
+    private DestinationSource destinationSource;
+    private final Object ensureConnectionInfoSentMutex = new Object();
+
+    /**
+     * Construct an <code>ActiveMQConnection</code>
+     * 
+     * @param transport
+     * @param factoryStats
+     * @throws Exception
+     */
+    protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl factoryStats) throws Exception {
+
+        this.transport = transport;
+        this.clientIdGenerator = clientIdGenerator;
+        this.factoryStats = factoryStats;
+
+        // Configure a single threaded executor who's core thread can timeout if
+        // idle
+        asyncConnectionThread = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+            public Thread newThread(Runnable r) {
+                Thread thread = new Thread(r, "ActiveMQ Connection Worker: " + transport);
+                thread.setDaemon(true);
+                return thread;
+            }
+        });
+        // asyncConnectionThread.allowCoreThreadTimeOut(true);
+
+        this.info = new ConnectionInfo(new ConnectionId(CONNECTION_ID_GENERATOR.generateId()));
+        this.info.setManageable(true);
+        this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
+
+        this.transport.setTransportListener(this);
+
+        this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
+        this.factoryStats.addConnection(this);
+        this.timeCreated = System.currentTimeMillis();
+        this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
+    }
+
+    protected void setUserName(String userName) {
+        this.info.setUserName(userName);
+    }
+
+    protected void setPassword(String password) {
+        this.info.setPassword(password);
+    }
+
+    /**
+     * A static helper method to create a new connection
+     * 
+     * @return an ActiveMQConnection
+     * @throws JMSException
+     */
+    public static ActiveMQConnection makeConnection() throws JMSException {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
+        return (ActiveMQConnection)factory.createConnection();
+    }
+
+    /**
+     * A static helper method to create a new connection
+     * 
+     * @param uri
+     * @return and ActiveMQConnection
+     * @throws JMSException
+     */
+    public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
+        return (ActiveMQConnection)factory.createConnection();
+    }
+
+    /**
+     * A static helper method to create a new connection
+     * 
+     * @param user
+     * @param password
+     * @param uri
+     * @return an ActiveMQConnection
+     * @throws JMSException
+     */
+    public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri));
+        return (ActiveMQConnection)factory.createConnection();
+    }
+
+    /**
+     * @return a number unique for this connection
+     */
+    public JMSConnectionStatsImpl getConnectionStats() {
+        return stats;
+    }
+
+    /**
+     * Creates a <CODE>Session</CODE> object.
+     * 
+     * @param transacted indicates whether the session is transacted
+     * @param acknowledgeMode indicates whether the consumer or the client will
+     *                acknowledge any messages it receives; ignored if the
+     *                session is transacted. Legal values are
+     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
+     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
+     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
+     * @return a newly created session
+     * @throws JMSException if the <CODE>Connection</CODE> object fails to
+     *                 create a session due to some internal error or lack of
+     *                 support for the specific transaction and acknowledgement
+     *                 mode.
+     * @see Session#AUTO_ACKNOWLEDGE
+     * @see Session#CLIENT_ACKNOWLEDGE
+     * @see Session#DUPS_OK_ACKNOWLEDGE
+     * @since 1.1
+     */
+    public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
+        checkClosedOrFailed();
+        ensureConnectionInfoSent();
+        if(!transacted) {
+            if (acknowledgeMode==Session.SESSION_TRANSACTED) {
+                throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
+            } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
+                throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
+                        "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
+            }
+        }
+        return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
+            ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync());
+    }
+
+    /**
+     * @return sessionId
+     */
+    protected SessionId getNextSessionId() {
+        return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId());
+    }
+
+    /**
+     * Gets the client identifier for this connection.
+     * <P>
+     * This value is specific to the JMS provider. It is either preconfigured by
+     * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
+     * dynamically by the application by calling the <code>setClientID</code>
+     * method.
+     * 
+     * @return the unique client identifier
+     * @throws JMSException if the JMS provider fails to return the client ID
+     *                 for this connection due to some internal error.
+     */
+    public String getClientID() throws JMSException {
+        checkClosedOrFailed();
+        return this.info.getClientId();
+    }
+
+    /**
+     * Sets the client identifier for this connection.
+     * <P>
+     * The preferred way to assign a JMS client's client identifier is for it to
+     * be configured in a client-specific <CODE>ConnectionFactory</CODE>
+     * object and transparently assigned to the <CODE>Connection</CODE> object
+     * it creates.
+     * <P>
+     * Alternatively, a client can set a connection's client identifier using a
+     * provider-specific value. The facility to set a connection's client
+     * identifier explicitly is not a mechanism for overriding the identifier
+     * that has been administratively configured. It is provided for the case
+     * where no administratively specified identifier exists. If one does exist,
+     * an attempt to change it by setting it must throw an
+     * <CODE>IllegalStateException</CODE>. If a client sets the client
+     * identifier explicitly, it must do so immediately after it creates the
+     * connection and before any other action on the connection is taken. After
+     * this point, setting the client identifier is a programming error that
+     * should throw an <CODE>IllegalStateException</CODE>.
+     * <P>
+     * The purpose of the client identifier is to associate a connection and its
+     * objects with a state maintained on behalf of the client by a provider.
+     * The only such state identified by the JMS API is that required to support
+     * durable subscriptions.
+     * <P>
+     * If another connection with the same <code>clientID</code> is already
+     * running when this method is called, the JMS provider should detect the
+     * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
+     * 
+     * @param newClientID the unique client identifier
+     * @throws JMSException if the JMS provider fails to set the client ID for
+     *                 this connection due to some internal error.
+     * @throws javax.jms.InvalidClientIDException if the JMS client specifies an
+     *                 invalid or duplicate client ID.
+     * @throws javax.jms.IllegalStateException if the JMS client attempts to set
+     *                 a connection's client ID at the wrong time or when it has
+     *                 been administratively configured.
+     */
+    public void setClientID(String newClientID) throws JMSException {
+        checkClosedOrFailed();
+
+        if (this.clientIDSet) {
+            throw new IllegalStateException("The clientID has already been set");
+        }
+
+        if (this.isConnectionInfoSentToBroker) {
+            throw new IllegalStateException("Setting clientID on a used Connection is not allowed");
+        }
+
+        this.info.setClientId(newClientID);
+        this.userSpecifiedClientID = true;
+        ensureConnectionInfoSent();
+    }
+
+    /**
+     * Sets the default client id that the connection will use if explicitly not
+     * set with the setClientId() call.
+     */
+    public void setDefaultClientID(String clientID) throws JMSException {
+        this.info.setClientId(clientID);
+        this.userSpecifiedClientID = true;
+    }
+
+    /**
+     * Gets the metadata for this connection.
+     * 
+     * @return the connection metadata
+     * @throws JMSException if the JMS provider fails to get the connection
+     *                 metadata for this connection.
+     * @see javax.jms.ConnectionMetaData
+     */
+    public ConnectionMetaData getMetaData() throws JMSException {
+        checkClosedOrFailed();
+        return ActiveMQConnectionMetaData.INSTANCE;
+    }
+
+    /**
+     * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
+     * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
+     * associated with it.
+     * 
+     * @return the <CODE>ExceptionListener</CODE> for this connection, or
+     *         null, if no <CODE>ExceptionListener</CODE> is associated with
+     *         this connection.
+     * @throws JMSException if the JMS provider fails to get the
+     *                 <CODE>ExceptionListener</CODE> for this connection.
+     * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
+     */
+    public ExceptionListener getExceptionListener() throws JMSException {
+        checkClosedOrFailed();
+        return this.exceptionListener;
+    }
+
+    /**
+     * Sets an exception listener for this connection.
+     * <P>
+     * If a JMS provider detects a serious problem with a connection, it informs
+     * the connection's <CODE> ExceptionListener</CODE>, if one has been
+     * registered. It does this by calling the listener's <CODE>onException
+     * </CODE>
+     * method, passing it a <CODE>JMSException</CODE> object describing the
+     * problem.
+     * <P>
+     * An exception listener allows a client to be notified of a problem
+     * asynchronously. Some connections only consume messages, so they would
+     * have no other way to learn their connection has failed.
+     * <P>
+     * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
+     * <P>
+     * A JMS provider should attempt to resolve connection problems itself
+     * before it notifies the client of them.
+     * 
+     * @param listener the exception listener
+     * @throws JMSException if the JMS provider fails to set the exception
+     *                 listener for this connection.
+     */
+    public void setExceptionListener(ExceptionListener listener) throws JMSException {
+        checkClosedOrFailed();
+        this.exceptionListener = listener;
+    }
+
+    /**
+     * Gets the <code>ClientInternalExceptionListener</code> object for this connection.
+     * Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE>
+     * associated with it.
+     * 
+     * @return the listener or <code>null</code> if no listener is registered with the connection.
+     */
+    public ClientInternalExceptionListener getClientInternalExceptionListener()
+    {
+        return clientInternalExceptionListener;
+    }
+
+    /**
+     * Sets a client internal exception listener for this connection.
+     * The connection will notify the listener, if one has been registered, of exceptions thrown by container components
+     * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message.
+     * It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code>
+     * describing the problem.
+     * 
+     * @param listener the exception listener
+     */
+    public void setClientInternalExceptionListener(ClientInternalExceptionListener listener)
+    {
+        this.clientInternalExceptionListener = listener;
+    }
+    
+    /**
+     * Starts (or restarts) a connection's delivery of incoming messages. A call
+     * to <CODE>start</CODE> on a connection that has already been started is
+     * ignored.
+     * 
+     * @throws JMSException if the JMS provider fails to start message delivery
+     *                 due to some internal error.
+     * @see javax.jms.Connection#stop()
+     */
+    public void start() throws JMSException {
+        checkClosedOrFailed();
+        ensureConnectionInfoSent();
+        if (started.compareAndSet(false, true)) {
+            for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
+                ActiveMQSession session = i.next();
+                session.start();
+            }
+        }
+    }
+
+    /**
+     * Temporarily stops a connection's delivery of incoming messages. Delivery
+     * can be restarted using the connection's <CODE>start</CODE> method. When
+     * the connection is stopped, delivery to all the connection's message
+     * consumers is inhibited: synchronous receives block, and messages are not
+     * delivered to message listeners.
+     * <P>
+     * This call blocks until receives and/or message listeners in progress have
+     * completed.
+     * <P>
+     * Stopping a connection has no effect on its ability to send messages. A
+     * call to <CODE>stop</CODE> on a connection that has already been stopped
+     * is ignored.
+     * <P>
+     * A call to <CODE>stop</CODE> must not return until delivery of messages
+     * has paused. This means that a client can rely on the fact that none of
+     * its message listeners will be called and that all threads of control
+     * waiting for <CODE>receive</CODE> calls to return will not return with a
+     * message until the connection is restarted. The receive timers for a
+     * stopped connection continue to advance, so receives may time out while
+     * the connection is stopped.
+     * <P>
+     * If message listeners are running when <CODE>stop</CODE> is invoked, the
+     * <CODE>stop</CODE> call must wait until all of them have returned before
+     * it may return. While these message listeners are completing, they must
+     * have the full services of the connection available to them.
+     * 
+     * @throws JMSException if the JMS provider fails to stop message delivery
+     *                 due to some internal error.
+     * @see javax.jms.Connection#start()
+     */
+    public void stop() throws JMSException {
+        checkClosedOrFailed();
+        if (started.compareAndSet(true, false)) {
+            synchronized(sessions) {
+                for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
+                    ActiveMQSession s = i.next();
+                    s.stop();
+                }
+            }
+        }
+    }
+
+    /**
+     * Closes the connection.
+     * <P>
+     * Since a provider typically allocates significant resources outside the
+     * JVM on behalf of a connection, clients should close these resources when
+     * they are not needed. Relying on garbage collection to eventually reclaim
+     * these resources may not be timely enough.
+     * <P>
+     * There is no need to close the sessions, producers, and consumers of a
+     * closed connection.
+     * <P>
+     * Closing a connection causes all temporary destinations to be deleted.
+     * <P>
+     * When this method is invoked, it should not return until message
+     * processing has been shut down in an orderly fashion. This means that all
+     * message listeners that may have been running have returned, and that all
+     * pending receives have returned. A close terminates all pending message
+     * receives on the connection's sessions' consumers. The receives may return
+     * with a message or with null, depending on whether there was a message
+     * available at the time of the close. If one or more of the connection's
+     * sessions' message listeners is processing a message at the time when
+     * connection <CODE>close</CODE> is invoked, all the facilities of the
+     * connection and its sessions must remain available to those listeners
+     * until they return control to the JMS provider.
+     * <P>
+     * Closing a connection causes any of its sessions' transactions in progress
+     * to be rolled back. In the case where a session's work is coordinated by
+     * an external transaction manager, a session's <CODE>commit</CODE> and
+     * <CODE> rollback</CODE> methods are not used and the result of a closed
+     * session's work is determined later by the transaction manager. Closing a
+     * connection does NOT force an acknowledgment of client-acknowledged
+     * sessions.
+     * <P>
+     * Invoking the <CODE>acknowledge</CODE> method of a received message from
+     * a closed connection's session must throw an
+     * <CODE>IllegalStateException</CODE>. Closing a closed connection must
+     * NOT throw an exception.
+     * 
+     * @throws JMSException if the JMS provider fails to close the connection
+     *                 due to some internal error. For example, a failure to
+     *                 release resources or to close a socket connection can
+     *                 cause this exception to be thrown.
+     */
+    public void close() throws JMSException {
+        try {
+            // If we were running, lets stop first.
+            if (!closed.get() && !transportFailed.get()) {
+                stop();
+            }
+
+            synchronized (this) {
+                if (!closed.get()) {
+                    closing.set(true);
+
+                    if (destinationSource != null) {
+                        destinationSource.stop();
+                        destinationSource = null;
+                    }
+                    if (advisoryConsumer != null) {
+                        advisoryConsumer.dispose();
+                        advisoryConsumer = null;
+                    }
+
+                    long lastDeliveredSequenceId = 0;
+                    for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
+                        ActiveMQSession s = i.next();
+                        s.dispose();
+                        lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId());
+                    }
+                    for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
+                        ActiveMQConnectionConsumer c = i.next();
+                        c.dispose();
+                    }
+                    for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
+                        ActiveMQInputStream c = i.next();
+                        c.dispose();
+                    }
+                    for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
+                        ActiveMQOutputStream c = i.next();
+                        c.dispose();
+                    }
+
+                    if (isConnectionInfoSentToBroker) {
+                        // If we announced ourselfs to the broker.. Try to let
+                        // the broker
+                        // know that the connection is being shutdown.
+                        RemoveInfo removeCommand = info.createRemoveCommand();
+                        removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
+                        doSyncSendPacket(info.createRemoveCommand(), closeTimeout);
+                        doAsyncSendPacket(new ShutdownInfo());
+                    }
+
+                    ServiceSupport.dispose(this.transport);
+
+                    started.set(false);
+
+                    // TODO if we move the TaskRunnerFactory to the connection
+                    // factory
+                    // then we may need to call
+                    // factory.onConnectionClose(this);
+                    sessionTaskRunner.shutdown();
+                    closed.set(true);
+                    closing.set(false);
+                }
+            }
+        } finally {
+            try {
+                if (asyncConnectionThread != null){
+                    asyncConnectionThread.shutdown();
+                }
+            }catch(Throwable e) {
+                LOG.error("Error shutting down thread pool " + e,e);
+            }
+            factoryStats.removeConnection(this);
+        }
+    }
+
+    /**
+     * Tells the broker to terminate its VM. This can be used to cleanly
+     * terminate a broker running in a standalone java process. Server must have
+     * property enable.vm.shutdown=true defined to allow this to work.
+     */
+    // TODO : org.apache.activemq.message.BrokerAdminCommand not yet
+    // implemented.
+    /*
+     * public void terminateBrokerVM() throws JMSException { BrokerAdminCommand
+     * command = new BrokerAdminCommand();
+     * command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
+     * asyncSendPacket(command); }
+     */
+
+    /**
+     * Create a durable connection consumer for this connection (optional
+     * operation). This is an expert facility not used by regular JMS clients.
+     * 
+     * @param topic topic to access
+     * @param subscriptionName durable subscription name
+     * @param messageSelector only messages with properties matching the message
+     *                selector expression are delivered. A value of null or an
+     *                empty string indicates that there is no message selector
+     *                for the message consumer.
+     * @param sessionPool the server session pool to associate with this durable
+     *                connection consumer
+     * @param maxMessages the maximum number of messages that can be assigned to
+     *                a server session at one time
+     * @return the durable connection consumer
+     * @throws JMSException if the <CODE>Connection</CODE> object fails to
+     *                 create a connection consumer due to some internal error
+     *                 or invalid arguments for <CODE>sessionPool</CODE> and
+     *                 <CODE>messageSelector</CODE>.
+     * @throws javax.jms.InvalidDestinationException if an invalid destination
+     *                 is specified.
+     * @throws javax.jms.InvalidSelectorException if the message selector is
+     *                 invalid.
+     * @see javax.jms.ConnectionConsumer
+     * @since 1.1
+     */
+    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
+        throws JMSException {
+        return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false);
+    }
+
+    /**
+     * Create a durable connection consumer for this connection (optional
+     * operation). This is an expert facility not used by regular JMS clients.
+     * 
+     * @param topic topic to access
+     * @param subscriptionName durable subscription name
+     * @param messageSelector only messages with properties matching the message
+     *                selector expression are delivered. A value of null or an
+     *                empty string indicates that there is no message selector
+     *                for the message consumer.
+     * @param sessionPool the server session pool to associate with this durable
+     *                connection consumer
+     * @param maxMessages the maximum number of messages that can be assigned to
+     *                a server session at one time
+     * @param noLocal set true if you want to filter out messages published
+     *                locally
+     * @return the durable connection consumer
+     * @throws JMSException if the <CODE>Connection</CODE> object fails to
+     *                 create a connection consumer due to some internal error
+     *                 or invalid arguments for <CODE>sessionPool</CODE> and
+     *                 <CODE>messageSelector</CODE>.
+     * @throws javax.jms.InvalidDestinationException if an invalid destination
+     *                 is specified.
+     * @throws javax.jms.InvalidSelectorException if the message selector is
+     *                 invalid.
+     * @see javax.jms.ConnectionConsumer
+     * @since 1.1
+     */
+    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages,
+                                                              boolean noLocal) throws JMSException {
+        checkClosedOrFailed();
+        ensureConnectionInfoSent();
+        SessionId sessionId = new SessionId(info.getConnectionId(), -1);
+        ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()));
+        info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
+        info.setSubscriptionName(subscriptionName);
+        info.setSelector(messageSelector);
+        info.setPrefetchSize(maxMessages);
+        info.setDispatchAsync(isDispatchAsync());
+
+        // Allows the options on the destination to configure the consumerInfo
+        if (info.getDestination().getOptions() != null) {
+            Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions());
+            IntrospectionSupport.setProperties(this.info, options, "consumer.");
+        }
+
+        return new ActiveMQConnectionConsumer(this, sessionPool, info);
+    }
+
+    // Properties
+    // -------------------------------------------------------------------------
+
+    /**
+     * Returns true if this connection has been started
+     * 
+     * @return true if this Connection is started
+     */
+    public boolean isStarted() {
+        return started.get();
+    }
+
+    /**
+     * Returns true if the connection is closed
+     */
+    public boolean isClosed() {
+        return closed.get();
+    }
+
+    /**
+     * Returns true if the connection is in the process of being closed
+     */
+    public boolean isClosing() {
+        return closing.get();
+    }
+
+    /**
+     * Returns true if the underlying transport has failed
+     */
+    public boolean isTransportFailed() {
+        return transportFailed.get();
+    }
+
+    /**
+     * @return Returns the prefetchPolicy.
+     */
+    public ActiveMQPrefetchPolicy getPrefetchPolicy() {
+        return prefetchPolicy;
+    }
+
+    /**
+     * Sets the <a
+     * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
+     * policy</a> for consumers created by this connection.
+     */
+    public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
+        this.prefetchPolicy = prefetchPolicy;
+    }
+
+    /**
+     */
+    public Transport getTransportChannel() {
+        return transport;
+    }
+
+    /**
+     * @return Returns the clientID of the connection, forcing one to be
+     *         generated if one has not yet been configured.
+     */
+    public String getInitializedClientID() throws JMSException {
+        ensureConnectionInfoSent();
+        return info.getClientId();
+    }
+
+    /**
+     * @return Returns the timeStampsDisableByDefault.
+     */
+    public boolean isDisableTimeStampsByDefault() {
+        return disableTimeStampsByDefault;
+    }
+
+    /**
+     * Sets whether or not timestamps on messages should be disabled or not. If
+     * you disable them it adds a small performance boost.
+     */
+    public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) {
+        this.disableTimeStampsByDefault = timeStampsDisableByDefault;
+    }
+
+    /**
+     * @return Returns the dispatchOptimizedMessage.
+     */
+    public boolean isOptimizedMessageDispatch() {
+        return optimizedMessageDispatch;
+    }
+
+    /**
+     * If this flag is set then an larger prefetch limit is used - only
+     * applicable for durable topic subscribers.
+     */
+    public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) {
+        this.optimizedMessageDispatch = dispatchOptimizedMessage;
+    }
+
+    /**
+     * @return Returns the closeTimeout.
+     */
+    public int getCloseTimeout() {
+        return closeTimeout;
+    }
+
+    /**
+     * Sets the timeout before a close is considered complete. Normally a
+     * close() on a connection waits for confirmation from the broker; this
+     * allows that operation to timeout to save the client hanging if there is
+     * no broker
+     */
+    public void setCloseTimeout(int closeTimeout) {
+        this.closeTimeout = closeTimeout;
+    }
+
+    /**
+     * @return ConnectionInfo
+     */
+    public ConnectionInfo getConnectionInfo() {
+        return this.info;
+    }
+
+    public boolean isUseRetroactiveConsumer() {
+        return useRetroactiveConsumer;
+    }
+
+    /**
+     * Sets whether or not retroactive consumers are enabled. Retroactive
+     * consumers allow non-durable topic subscribers to receive old messages
+     * that were published before the non-durable subscriber started.
+     */
+    public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
+        this.useRetroactiveConsumer = useRetroactiveConsumer;
+    }
+
+    public boolean isNestedMapAndListEnabled() {
+        return nestedMapAndListEnabled;
+    }
+
+    /**
+     * Enables/disables whether or not Message properties and MapMessage entries
+     * support <a
+     * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
+     * Structures</a> of Map and List objects
+     */
+    public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
+        this.nestedMapAndListEnabled = structuredMapsEnabled;
+    }
+
+    public boolean isExclusiveConsumer() {
+        return exclusiveConsumer;
+    }
+
+    /**
+     * Enables or disables whether or not queue consumers should be exclusive or
+     * not for example to preserve ordering when not using <a
+     * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
+     * 
+     * @param exclusiveConsumer
+     */
+    public void setExclusiveConsumer(boolean exclusiveConsumer) {
+        this.exclusiveConsumer = exclusiveConsumer;
+    }
+
+    /**
+     * Adds a transport listener so that a client can be notified of events in
+     * the underlying transport
+     */
+    public void addTransportListener(TransportListener transportListener) {
+        transportListeners.add(transportListener);
+    }
+
+    public void removeTransportListener(TransportListener transportListener) {
+        transportListeners.remove(transportListener);
+    }
+
+    public TaskRunnerFactory getSessionTaskRunner() {
+        return sessionTaskRunner;
+    }
+
+    public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
+        this.sessionTaskRunner = sessionTaskRunner;
+    }
+
+    public MessageTransformer getTransformer() {
+        return transformer;
+    }
+
+    /**
+     * Sets the transformer used to transform messages before they are sent on
+     * to the JMS bus or when they are received from the bus but before they are
+     * delivered to the JMS client
+     */
+    public void setTransformer(MessageTransformer transformer) {
+        this.transformer = transformer;
+    }
+
+    /**
+     * @return the statsEnabled
+     */
+    public boolean isStatsEnabled() {
+        return this.stats.isEnabled();
+    }
+
+    /**
+     * @param statsEnabled the statsEnabled to set
+     */
+    public void setStatsEnabled(boolean statsEnabled) {
+        this.stats.setEnabled(statsEnabled);
+    }
+
+    /**
+     * Returns the {@link DestinationSource} object which can be used to listen to destinations
+     * being created or destroyed or to enquire about the current destinations available on the broker
+     *
+     * @return a lazily created destination source
+     * @throws JMSException
+     */
+    public DestinationSource getDestinationSource() throws JMSException {
+        if (destinationSource == null) {
+            destinationSource = new DestinationSource(this);
+            destinationSource.start();
+        }
+        return destinationSource;
+    }
+
+    // Implementation methods
+    // -------------------------------------------------------------------------
+
+    /**
+     * Used internally for adding Sessions to the Connection
+     * 
+     * @param session
+     * @throws JMSException
+     * @throws JMSException
+     */
+    protected void addSession(ActiveMQSession session) throws JMSException {
+        this.sessions.add(session);
+        if (sessions.size() > 1 || session.isTransacted()) {
+            optimizedMessageDispatch = false;
+        }
+    }
+
+    /**
+     * Used interanlly for removing Sessions from a Connection
+     * 
+     * @param session
+     */
+    protected void removeSession(ActiveMQSession session) {
+        this.sessions.remove(session);
+        this.removeDispatcher(session);
+    }
+
+    /**
+     * Add a ConnectionConsumer
+     * 
+     * @param connectionConsumer
+     * @throws JMSException
+     */
+    protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
+        this.connectionConsumers.add(connectionConsumer);
+    }
+
+    /**
+     * Remove a ConnectionConsumer
+     * 
+     * @param connectionConsumer
+     */
+    protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
+        this.connectionConsumers.remove(connectionConsumer);
+        this.removeDispatcher(connectionConsumer);
+    }
+
+    /**
+     * Creates a <CODE>TopicSession</CODE> object.
+     * 
+     * @param transacted indicates whether the session is transacted
+     * @param acknowledgeMode indicates whether the consumer or the client will
+     *                acknowledge any messages it receives; ignored if the
+     *                session is transacted. Legal values are
+     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
+     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
+     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
+     * @return a newly created topic session
+     * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
+     *                 to create a session due to some internal error or lack of
+     *                 support for the specific transaction and acknowledgement
+     *                 mode.
+     * @see Session#AUTO_ACKNOWLEDGE
+     * @see Session#CLIENT_ACKNOWLEDGE
+     * @see Session#DUPS_OK_ACKNOWLEDGE
+     */
+    public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
+        return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
+    }
+
+    /**
+     * Creates a connection consumer for this connection (optional operation).
+     * This is an expert facility not used by regular JMS clients.
+     * 
+     * @param topic the topic to access
+     * @param messageSelector only messages with properties matching the message
+     *                selector expression are delivered. A value of null or an
+     *                empty string indicates that there is no message selector
+     *                for the message consumer.
+     * @param sessionPool the server session pool to associate with this
+     *                connection consumer
+     * @param maxMessages the maximum number of messages that can be assigned to
+     *                a server session at one time
+     * @return the connection consumer
+     * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
+     *                 to create a connection consumer due to some internal
+     *                 error or invalid arguments for <CODE>sessionPool</CODE>
+     *                 and <CODE>messageSelector</CODE>.
+     * @throws javax.jms.InvalidDestinationException if an invalid topic is
+     *                 specified.
+     * @throws javax.jms.InvalidSelectorException if the message selector is
+     *                 invalid.
+     * @see javax.jms.ConnectionConsumer
+     */
+    public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+        return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false);
+    }
+
+    /**
+     * Creates a connection consumer for this connection (optional operation).
+     * This is an expert facility not used by regular JMS clients.
+     * 
+     * @param queue the queue to access
+     * @param messageSelector only messages with properties matching the message
+     *                selector expression are delivered. A value of null or an
+     *                empty string indicates that there is no message selector
+     *                for the message consumer.
+     * @param sessionPool the server session pool to associate with this
+     *                connection consumer
+     * @param maxMessages the maximum number of messages that can be assigned to
+     *                a server session at one time
+     * @return the connection consumer
+     * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
+     *                 to create a connection consumer due to some internal
+     *                 error or invalid arguments for <CODE>sessionPool</CODE>
+     *                 and <CODE>messageSelector</CODE>.
+     * @throws javax.jms.InvalidDestinationException if an invalid queue is
+     *                 specified.
+     * @throws javax.jms.InvalidSelectorException if the message selector is
+     *                 invalid.
+     * @see javax.jms.ConnectionConsumer
+     */
+    public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+        return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false);
+    }
+
+    /**
+     * Creates a connection consumer for this connection (optional operation).
+     * This is an expert facility not used by regular JMS clients.
+     * 
+     * @param destination the destination to access
+     * @param messageSelector only messages with properties matching the message
+     *                selector expression are delivered. A value of null or an
+     *                empty string indicates that there is no message selector
+     *                for the message consumer.
+     * @param sessionPool the server session pool to associate with this
+     *                connection consumer
+     * @param maxMessages the maximum number of messages that can be assigned to
+     *                a server session at one time
+     * @return the connection consumer
+     * @throws JMSException if the <CODE>Connection</CODE> object fails to
+     *                 create a connection consumer due to some internal error
+     *                 or invalid arguments for <CODE>sessionPool</CODE> and
+     *                 <CODE>messageSelector</CODE>.
+     * @throws javax.jms.InvalidDestinationException if an invalid destination
+     *                 is specified.
+     * @throws javax.jms.InvalidSelectorException if the message selector is
+     *                 invalid.
+     * @see javax.jms.ConnectionConsumer
+     * @since 1.1
+     */
+    public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+        return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false);
+    }
+
+    public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal)
+        throws JMSException {
+
+        checkClosedOrFailed();
+        ensureConnectionInfoSent();
+
+        ConsumerId consumerId = createConsumerId();
+        ConsumerInfo consumerInfo = new ConsumerInfo(consumerId);
+        consumerInfo.setDestination(ActiveMQMessageTransformation.transformDestination(destination));
+        consumerInfo.setSelector(messageSelector);
+        consumerInfo.setPrefetchSize(maxMessages);
+        consumerInfo.setNoLocal(noLocal);
+        consumerInfo.setDispatchAsync(isDispatchAsync());
+
+        // Allows the options on the destination to configure the consumerInfo
+        if (consumerInfo.getDestination().getOptions() != null) {
+            Map<String, String> options = new HashMap<String, String>(consumerInfo.getDestination().getOptions());
+            IntrospectionSupport.setProperties(consumerInfo, options, "consumer.");
+        }
+
+        return new ActiveMQConnectionConsumer(this, sessionPool, consumerInfo);
+    }
+
+    /**
+     * @return
+     */
+    private ConsumerId createConsumerId() {
+        return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId());
+    }
+
+    /**
+     * @return
+     */
+    private ProducerId createProducerId() {
+        return new ProducerId(connectionSessionId, producerIdGenerator.getNextSequenceId());
+    }
+
+    /**
+     * Creates a <CODE>QueueSession</CODE> object.
+     * 
+     * @param transacted indicates whether the session is transacted
+     * @param acknowledgeMode indicates whether the consumer or the client will
+     *                acknowledge any messages it receives; ignored if the
+     *                session is transacted. Legal values are
+     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
+     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
+     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
+     * @return a newly created queue session
+     * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
+     *                 to create a session due to some internal error or lack of
+     *                 support for the specific transaction and acknowledgement
+     *                 mode.
+     * @see Session#AUTO_ACKNOWLEDGE
+     * @see Session#CLIENT_ACKNOWLEDGE
+     * @see Session#DUPS_OK_ACKNOWLEDGE
+     */
+    public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
+        return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
+    }
+
+    /**
+     * Ensures that the clientID was manually specified and not auto-generated.
+     * If the clientID was not specified this method will throw an exception.
+     * This method is used to ensure that the clientID + durableSubscriber name
+     * are used correctly.
+     * 
+     * @throws JMSException
+     */
+    public void checkClientIDWasManuallySpecified() throws JMSException {
+        if (!userSpecifiedClientID) {
+            throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection");
+        }
+    }
+
+    /**
+     * send a Packet through the Connection - for internal use only
+     * 
+     * @param command
+     * @throws JMSException
+     */
+    public void asyncSendPacket(Command command) throws JMSException {
+        if (isClosed()) {
+            throw new ConnectionClosedException();
+        } else {
+            doAsyncSendPacket(command);
+        }
+    }
+
+	private void doAsyncSendPacket(Command command) throws JMSException {
+		try {
+		    this.transport.oneway(command);
+		} catch (IOException e) {
+		    throw JMSExceptionSupport.create(e);
+		}
+	}
+
+    /**
+     * Send a packet through a Connection - for internal use only
+     * 
+     * @param command
+     * @return
+     * @throws JMSException
+     */
+    public Response syncSendPacket(Command command) throws JMSException {
+        if (isClosed()) {
+            throw new ConnectionClosedException();
+        } else {
+
+            try {
+                Response response = (Response)this.transport.request(command);
+                if (response.isException()) {
+                    ExceptionResponse er = (ExceptionResponse)response;
+                    if (er.getException() instanceof JMSException) {
+                        throw (JMSException)er.getException();
+                    } else {
+                        if (isClosed()||closing.get()) {
+                            LOG.debug("Received an exception but connection is closing");
+                        }
+                        JMSException jmsEx = null;
+                        try {
+                         jmsEx = JMSExceptionSupport.create(er.getException());
+                        }catch(Throwable e) {
+                            LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
+                        }
+                        if(jmsEx !=null) {
+                            throw jmsEx;
+                        }
+                    }
+                }
+                return response;
+            } catch (IOException e) {
+                throw JMSExceptionSupport.create(e);
+            }
+        }
+    }
+
+    /**
+     * Send a packet through a Connection - for internal use only
+     * 
+     * @param command
+     * @return
+     * @throws JMSException
+     */
+    public Response syncSendPacket(Command command, int timeout) throws JMSException {
+        if (isClosed() || closing.get()) {
+            throw new ConnectionClosedException();
+        } else {
+            return doSyncSendPacket(command, timeout);
+        }
+    }
+
+	private Response doSyncSendPacket(Command command, int timeout)
+			throws JMSException {
+		try {
+		    Response response = (Response)this.transport.request(command, timeout);
+		    if (response != null && response.isException()) {
+		        ExceptionResponse er = (ExceptionResponse)response;
+		        if (er.getException() instanceof JMSException) {
+		            throw (JMSException)er.getException();
+		        } else {
+		            throw JMSExceptionSupport.create(er.getException());
+		        }
+		    }
+		    return response;
+		} catch (IOException e) {
+		    throw JMSExceptionSupport.create(e);
+		}
+	}
+
+    /**
+     * @return statistics for this Connection
+     */
+    public StatsImpl getStats() {
+        return stats;
+    }
+
+    /**
+     * simply throws an exception if the Connection is already closed or the
+     * Transport has failed
+     * 
+     * @throws JMSException
+     */
+    protected synchronized void checkClosedOrFailed() throws JMSException {
+        checkClosed();
+        if (transportFailed.get()) {
+            throw new ConnectionFailedException(firstFailureError);
+        }
+    }
+
+    /**
+     * simply throws an exception if the Connection is already closed
+     * 
+     * @throws JMSException
+     */
+    protected synchronized void checkClosed() throws JMSException {
+        if (closed.get()) {
+            throw new ConnectionClosedException();
+        }
+    }
+
+    /**
+     * Send the ConnectionInfo to the Broker
+     * 
+     * @throws JMSException
+     */
+    protected void ensureConnectionInfoSent() throws JMSException {
+        synchronized(this.ensureConnectionInfoSentMutex) {
+            // Can we skip sending the ConnectionInfo packet??
+            if (isConnectionInfoSentToBroker || closed.get()) {
+                return;
+            }
+            //TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID?
+            if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
+                info.setClientId(clientIdGenerator.generateId());
+            }
+            syncSendPacket(info.copy());
+    
+            this.isConnectionInfoSentToBroker = true;
+            // Add a temp destination advisory consumer so that
+            // We know what the valid temporary destinations are on the
+            // broker without having to do an RPC to the broker.
+    
+            ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());
+            if (watchTopicAdvisories) {
+                advisoryConsumer = new AdvisoryConsumer(this, consumerId);
+            }
+        }
+    }
+
+    public synchronized boolean isWatchTopicAdvisories() {
+        return watchTopicAdvisories;
+    }
+
+    public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
+        this.watchTopicAdvisories = watchTopicAdvisories;
+    }
+
+    /**
+     * @return Returns the useAsyncSend.
+     */
+    public boolean isUseAsyncSend() {
+        return useAsyncSend;
+    }
+
+    /**
+     * Forces the use of <a
+     * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
+     * adds a massive performance boost; but means that the send() method will
+     * return immediately whether the message has been sent or not which could
+     * lead to message loss.
+     */
+    public void setUseAsyncSend(boolean useAsyncSend) {
+        this.useAsyncSend = useAsyncSend;
+    }
+
+    /**
+     * @return true if always sync send messages
+     */
+    public boolean isAlwaysSyncSend() {
+        return this.alwaysSyncSend;
+    }
+
+    /**
+     * Set true if always require messages to be sync sent
+     * 
+     * @param alwaysSyncSend
+     */
+    public void setAlwaysSyncSend(boolean alwaysSyncSend) {
+        this.alwaysSyncSend = alwaysSyncSend;
+    }
+
+    /**
+     * Cleans up this connection so that it's state is as if the connection was
+     * just created. This allows the Resource Adapter to clean up a connection
+     * so that it can be reused without having to close and recreate the
+     * connection.
+     */
+    public void cleanup() throws JMSException {
+
+        if (advisoryConsumer != null && !isTransportFailed()) {
+            advisoryConsumer.dispose();
+            advisoryConsumer = null;
+        }
+
+        for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
+            ActiveMQSession s = i.next();
+            s.dispose();
+        }
+        for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
+            ActiveMQConnectionConsumer c = i.next();
+            c.dispose();
+        }
+        for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
+            ActiveMQInputStream c = i.next();
+            c.dispose();
+        }
+        for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
+            ActiveMQOutputStream c = i.next();
+            c.dispose();
+        }
+
+        if (isConnectionInfoSentToBroker) {
+            if (!transportFailed.get() && !closing.get()) {
+                asyncSendPacket(info.createRemoveCommand());
+            }
+            isConnectionInfoSentToBroker = false;
+        }
+        if (userSpecifiedClientID) {
+            info.setClientId(null);
+            userSpecifiedClientID = false;
+        }
+        clientIDSet = false;
+
+        started.set(false);
+    }
+
+    /**
+     * Changes the associated username/password that is associated with this
+     * connection. If the connection has been used, you must called cleanup()
+     * before calling this method.
+     * 
+     * @throws IllegalStateException if the connection is in used.
+     */
+    public void changeUserInfo(String userName, String password) throws JMSException {
+        if (isConnectionInfoSentToBroker) {
+            throw new IllegalStateException("changeUserInfo used Connection is not allowed");
+        }
+        this.info.setUserName(userName);
+        this.info.setPassword(password);
+    }
+
+    /**
+     * @return Returns the resourceManagerId.
+     * @throws JMSException
+     */
+    public String getResourceManagerId() throws JMSException {
+        waitForBrokerInfo();
+        if (brokerInfo == null) {
+            throw new JMSException("Connection failed before Broker info was received.");
+        }
+        return brokerInfo.getBrokerId().getValue();
+    }
+
+    /**
+     * Returns the broker name if one is available or null if one is not
+     * available yet.
+     */
+    public String getBrokerName() {
+        try {
+            brokerInfoReceived.await(5, TimeUnit.SECONDS);
+            if (brokerInfo == null) {
+                return null;
+            }
+            return brokerInfo.getBrokerName();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            return null;
+        }
+    }
+
+    /**
+     * Returns the broker information if it is available or null if it is not
+     * available yet.
+     */
+    public BrokerInfo getBrokerInfo() {
+        return brokerInfo;
+    }
+
+    /**
+     * @return Returns the RedeliveryPolicy.
+     * @throws JMSException
+     */
+    public RedeliveryPolicy getRedeliveryPolicy() throws JMSException {
+        return redeliveryPolicy;
+    }
+
+    /**
+     * Sets the redelivery policy to be used when messages are rolled back
+     */
+    public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
+        this.redeliveryPolicy = redeliveryPolicy;
+    }
+
+    public BlobTransferPolicy getBlobTransferPolicy() {
+        if (blobTransferPolicy == null) {
+            blobTransferPolicy = createBlobTransferPolicy();
+        }
+        return blobTransferPolicy;
+    }
+
+    /**
+     * Sets the policy used to describe how out-of-band BLOBs (Binary Large
+     * OBjects) are transferred from producers to brokers to consumers
+     */
+    public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
+        this.blobTransferPolicy = blobTransferPolicy;
+    }
+
+    /**
+     * @return Returns the alwaysSessionAsync.
+     */
+    public boolean isAlwaysSessionAsync() {
+        return alwaysSessionAsync;
+    }
+
+    /**
+     * If this flag is set then a separate thread is not used for dispatching
+     * messages for each Session in the Connection. However, a separate thread
+     * is always used if there is more than one session, or the session isn't in
+     * auto acknowledge or duplicates ok mode
+     */
+    public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
+        this.alwaysSessionAsync = alwaysSessionAsync;
+    }
+
+    /**
+     * @return Returns the optimizeAcknowledge.
+     */
+    public boolean isOptimizeAcknowledge() {
+        return optimizeAcknowledge;
+    }
+
+    /**
+     * Enables an optimised acknowledgement mode where messages are acknowledged
+     * in batches rather than individually
+     * 
+     * @param optimizeAcknowledge The optimizeAcknowledge to set.
+     */
+    public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
+        this.optimizeAcknowledge = optimizeAcknowledge;
+    }
+
+    public long getWarnAboutUnstartedConnectionTimeout() {
+        return warnAboutUnstartedConnectionTimeout;
+    }
+
+    /**
+     * Enables the timeout from a connection creation to when a warning is
+     * generated if the connection is not properly started via {@link #start()}
+     * and a message is received by a consumer. It is a very common gotcha to
+     * forget to <a
+     * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
+     * the connection</a> so this option makes the default case to create a
+     * warning if the user forgets. To disable the warning just set the value to <
+     * 0 (say -1).
+     */
+    public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
+        this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
+    }
+    
+    /**
+     * @return the sendTimeout
+     */
+    public int getSendTimeout() {
+        return sendTimeout;
+    }
+
+    /**
+     * @param sendTimeout the sendTimeout to set
+     */
+    public void setSendTimeout(int sendTimeout) {
+        this.sendTimeout = sendTimeout;
+    }
+    
+    /**
+     * @return the sendAcksAsync
+     */
+    public boolean isSendAcksAsync() {
+        return sendAcksAsync;
+    }
+
+    /**
+     * @param sendAcksAsync the sendAcksAsync to set
+     */
+    public void setSendAcksAsync(boolean sendAcksAsync) {
+        this.sendAcksAsync = sendAcksAsync;
+    }
+
+
+    /**
+     * Returns the time this connection was created
+     */
+    public long getTimeCreated() {
+        return timeCreated;
+    }
+
+    private void waitForBrokerInfo() throws JMSException {
+        try {
+            brokerInfoReceived.await();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw JMSExceptionSupport.create(e);
+        }
+    }
+
+    // Package protected so that it can be used in unit tests
+    public Transport getTransport() {
+        return transport;
+    }
+
+    public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) {
+        producers.put(producerId, producer);
+    }
+
+    public void removeProducer(ProducerId producerId) {
+        producers.remove(producerId);
+    }
+
+    public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) {
+        dispatchers.put(consumerId, dispatcher);
+    }
+
+    public void removeDispatcher(ConsumerId consumerId) {
+        dispatchers.remove(consumerId);
+    }
+
+    /**
+     * @param o - the command to consume
+     */
+    public void onCommand(final Object o) {
+        final Command command = (Command)o;
+        if (!closed.get() && command != null) {
+            try {
+                command.visit(new CommandVisitorAdapter() {
+                    @Override
+                    public Response processMessageDispatch(MessageDispatch md) throws Exception {
+                        ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
+                        if (dispatcher != null) {
+                            // Copy in case a embedded broker is dispatching via
+                            // vm://
+                            // md.getMessage() == null to signal end of queue
+                            // browse.
+                            Message msg = md.getMessage();
+                            if (msg != null) {
+                                msg = msg.copy();
+                                msg.setReadOnlyBody(true);
+                                msg.setReadOnlyProperties(true);
+                                msg.setRedeliveryCounter(md.getRedeliveryCounter());
+                                msg.setConnection(ActiveMQConnection.this);
+                                md.setMessage(msg);
+                            }
+                            dispatcher.dispatch(md);
+                        }
+                        return null;
+                    }
+
+                    @Override
+                    public Response processProducerAck(ProducerAck pa) throws Exception {
+                        if (pa != null && pa.getProducerId() != null) {
+                            ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
+                            if (producer != null) {
+                                producer.onProducerAck(pa);
+                            }
+                        }
+                        return null;
+                    }
+
+                    @Override
+                    public Response processBrokerInfo(BrokerInfo info) throws Exception {
+                        brokerInfo = info;
+                        brokerInfoReceived.countDown();
+                        optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
+                        getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
+                        return null;
+                    }
+
+                    @Override
+                    public Response processConnectionError(final ConnectionError error) throws Exception {
+                        asyncConnectionThread.execute(new Runnable() {
+                            public void run() {
+                                onAsyncException(error.getException());
+                            }
+                        });
+                        return null;
+                    }
+
+                    @Override
+                    public Response processControlCommand(ControlCommand command) throws Exception {
+                        onControlCommand(command);
+                        return null;
+                    }
+
+                    @Override
+                    public Response processConnectionControl(ConnectionControl control) throws Exception {
+                        onConnectionControl((ConnectionControl)command);
+                        return null;
+                    }
+
+                    @Override
+                    public Response processConsumerControl(ConsumerControl control) throws Exception {
+                        onConsumerControl((ConsumerControl)command);
+                        return null;
+                    }
+
+                    @Override
+                    public Response processWireFormat(WireFormatInfo info) throws Exception {
+                        onWireFormatInfo((WireFormatInfo)command);
+                        return null;
+                    }
+                });
+            } catch (Exception e) {
+                onClientInternalException(e);
+            }
+
+        }
+        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
+            TransportListener listener = iter.next();
+            listener.onCommand(command);
+        }
+    }
+
+    protected void onWireFormatInfo(WireFormatInfo info) {
+        protocolVersion.set(info.getVersion());
+    }
+
+    /**
+     * Handles async client internal exceptions.
+     * A client internal exception is usually one that has been thrown
+     * by a container runtime component during asynchronous processing of a
+     * message that does not affect the connection itself.
+     * This method notifies the <code>ClientInternalExceptionListener</code> by invoking
+     * its <code>onException</code> method, if one has been registered with this connection.
+     * 
+     * @param error the exception that the problem
+     */
+    public void onClientInternalException(final Throwable error) {
+        if ( !closed.get() && !closing.get() ) {
+            if ( this.clientInternalExceptionListener != null ) {
+                asyncConnectionThread.execute(new Runnable() {
+                    public void run() {
+                        ActiveMQConnection.this.clientInternalExceptionListener.onException(error);
+                    }
+                });
+            } else {
+                LOG.debug("Async client internal exception occurred with no exception listener registered: " 
+                        + error, error);
+            }
+        }
+    }
+    /**
+     * Used for handling async exceptions
+     * 
+     * @param error
+     */
+    public void onAsyncException(Throwable error) {
+        if (!closed.get() && !closing.get()) {
+            if (this.exceptionListener != null) {
+
+                if (!(error instanceof JMSException)) {
+                    error = JMSExceptionSupport.create(error);
+                }
+                final JMSException e = (JMSException)error;
+
+                asyncConnectionThread.execute(new Runnable() {
+                    public void run() {
+                        ActiveMQConnection.this.exceptionListener.onException(e);
+                    }
+                });
+
+            } else {
+                LOG.debug("Async exception with no exception listener: " + error, error);
+            }
+        }
+    }
+
+    public void onException(final IOException error) {
+		onAsyncException(error);
+		if (!closing.get() && !closed.get()) {
+			asyncConnectionThread.execute(new Runnable() {
+				public void run() {
+					transportFailed(error);
+					ServiceSupport.dispose(ActiveMQConnection.this.transport);
+					brokerInfoReceived.countDown();
+					try {
+						cleanup();
+					} catch (JMSException e) {
+						LOG.warn("Exception during connection cleanup, " + e, e);
+					}
+					for (Iterator<TransportListener> iter = transportListeners
+							.iterator(); iter.hasNext();) {
+						TransportListener listener = iter.next();
+						listener.onException(error);
+					}
+				}
+			});
+		}
+	}
+
+    public void transportInterupted() {
+        for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
+            ActiveMQSession s = i.next();
+            s.clearMessagesInProgress();
+        }
+        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
+            TransportListener listener = iter.next();
+            listener.transportInterupted();
+        }
+    }
+
+    public void transportResumed() {
+        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
+            TransportListener listener = iter.next();
+            listener.transportResumed();
+        }
+    }
+
+    /**
+     * Create the DestinationInfo object for the temporary destination.
+     * 
+     * @param topic - if its true topic, else queue.
+     * @return DestinationInfo
+     * @throws JMSException
+     */
+    protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException {
+
+        // Check if Destination info is of temporary type.
+        ActiveMQTempDestination dest;
+        if (topic) {
+            dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
+        } else {
+            dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
+        }
+
+        DestinationInfo info = new DestinationInfo();
+        info.setConnectionId(this.info.getConnectionId());
+        info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
+        info.setDestination(dest);
+        syncSendPacket(info);
+
+        dest.setConnection(this);
+        activeTempDestinations.put(dest, dest);
+        return dest;
+    }
+
+    /**
+     * @param destination
+     * @throws JMSException
+     */
+    public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException {
+
+        checkClosedOrFailed();
+
+        for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
+            ActiveMQSession s = i.next();
+            if (s.isInUse(destination)) {
+                throw new JMSException("A consumer is consuming from the temporary destination");
+            }
+        }
+
+        activeTempDestinations.remove(destination);
+
+        DestinationInfo info = new DestinationInfo();
+        info.setConnectionId(this.info.getConnectionId());
+        info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
+        info.setDestination(destination);
+        info.setTimeout(0);
+        syncSendPacket(info);
+    }
+
+    public boolean isDeleted(ActiveMQDestination dest) {
+
+        // If we are not watching the advisories.. then
+        // we will assume that the temp destination does exist.
+        if (advisoryConsumer == null) {
+            return false;
+        }
+
+        return !activeTempDestinations.contains(dest);
+    }
+
+    public boolean isCopyMessageOnSend() {
+        return copyMessageOnSend;
+    }
+
+    public LongSequenceGenerator getLocalTransactionIdGenerator() {
+        return localTransactionIdGenerator;
+    }
+
+    public boolean isUseCompression() {
+        return useCompression;
+    }
+
+    /**
+     * Enables the use of compression of the message bodies
+     */
+    public void setUseCompression(boolean useCompression) {
+        this.useCompression = useCompression;
+    }
+
+    public void destroyDestination(ActiveMQDestination destination) throws JMSException {
+
+        checkClosedOrFailed();
+        ensureConnectionInfoSent();
+
+        DestinationInfo info = new DestinationInfo();
+        info.setConnectionId(this.info.getConnectionId());
+        info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
+        info.setDestination(destination);
+        info.setTimeout(0);
+        syncSendPacket(info);
+
+    }
+
+    public boolean isDispatchAsync() {
+        return dispatchAsync;
+    }
+
+    /**
+     * Enables or disables the default setting of whether or not consumers have
+     * their messages <a
+     * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
+     * synchronously or asynchronously by the broker</a>. For non-durable
+     * topics for example we typically dispatch synchronously by default to
+     * minimize context switches which boost performance. However sometimes its
+     * better to go slower to ensure that a single blocked consumer socket does
+     * not block delivery to other consumers.
+     * 
+     * @param asyncDispatch If true then consumers created on this connection
+     *                will default to having their messages dispatched
+     *                asynchronously. The default value is false.
+     */
+    public void setDispatchAsync(boolean asyncDispatch) {
+        this.dispatchAsync = asyncDispatch;
+    }
+
+    public boolean isObjectMessageSerializationDefered() {
+        return objectMessageSerializationDefered;
+    }
+
+    /**
+     * When an object is set on an ObjectMessage, the JMS spec requires the
+     * object to be serialized by that set method. Enabling this flag causes the
+     * object to not get serialized. The object may subsequently get serialized
+     * if the message needs to be sent over a socket or stored to disk.
+     */
+    public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
+        this.objectMessageSerializationDefered = objectMessageSerializationDefered;
+    }
+
+    public InputStream createInputStream(Destination dest) throws JMSException {
+        return createInputStream(dest, null);
+    }
+
+    public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException {
+        return createInputStream(dest, messageSelector, false);
+    }
+
+    public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException {
+        return doCreateInputStream(dest, messageSelector, noLocal, null);
+    }
+
+    public InputStream createDurableInputStream(Topic dest, String name) throws JMSException {
+        return createInputStream(dest, null, false);
+    }
+
+    public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException {
+        return createDurableInputStream(dest, name, messageSelector, false);
+    }
+
+    public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException {
+        return doCreateInputStream(dest, messageSelector, noLocal, name);
+    }
+
+    private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName) throws JMSException {
+        checkClosedOrFailed();
+        ensureConnectionInfoSent();
+        return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch());
+    }
+
+    /**
+     * Creates a persistent output stream; individual messages will be written
+     * to disk/database by the broker
+     */
+    public OutputStream createOutputStream(Destination dest) throws JMSException {
+        return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
+    }
+
+    /**
+     * Creates a non persistent output stream; messages will not be written to
+     * disk
+     */
+    public OutputStream createNonPersistentOutputStream(Destination dest) throws JMSException {
+        return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
+    }
+
+    /**
+     * Creates an output stream allowing full control over the delivery mode,
+     * the priority and time to live of the messages and the properties added to
+     * messages on the stream.
+     * 
+     * @param streamProperties defines a map of key-value pairs where the keys
+     *                are strings and the values are primitive values (numbers
+     *                and strings) which are appended to the messages similarly
+     *                to using the
+     *                {@link javax.jms.Message#setObjectProperty(String, Object)}
+     *                method
+     */
+    public OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException {
+        checkClosedOrFailed();
+        ensureConnectionInfoSent();
+        return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive);
+    }
+
+    /**
+     * Unsubscribes a durable subscription that has been created by a client.
+     * <P>
+     * This method deletes the state being maintained on behalf of the
+     * subscriber by its provider.
+     * <P>
+     * It is erroneous for a client to delete a durable subscription while there
+     * is an active <CODE>MessageConsumer </CODE> or
+     * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
+     * message is part of a pending transaction or has not been acknowledged in
+     * the session.
+     * 
+     * @param name the name used to identify this subscription
+     * @throws JMSException if the session fails to unsubscribe to the durable
+     *                 subscription due to some internal error.
+     * @throws InvalidDestinationException if an invalid subscription name is
+     *                 specified.
+     * @since 1.1
+     */
+    public void unsubscribe(String name) throws InvalidDestinationException, JMSException {
+        checkClosedOrFailed();
+        RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
+        rsi.setConnectionId(getConnectionInfo().getConnectionId());
+        rsi.setSubscriptionName(name);
+        rsi.setClientId(getConnectionInfo().getClientId());
+        syncSendPacket(rsi);
+    }
+
+    /**
+     * Internal send method optimized: - It does not copy the message - It can
+     * only handle ActiveMQ messages. - You can specify if the send is async or
+     * sync - Does not allow you to send /w a transaction.
+     */
+    void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException {
+        checkClosedOrFailed();
+
+        if (destination.isTemporary() && isDeleted(destination)) {
+            throw new JMSException("Cannot publish to a deleted Destination: " + destination);
+        }
+
+        msg.setJMSDestination(destination);
+        msg.setJMSDeliveryMode(deliveryMode);
+        long expiration = 0L;
+
+        if (!isDisableTimeStampsByDefault()) {
+            long timeStamp = System.currentTimeMillis();
+            msg.setJMSTimestamp(timeStamp);
+            if (timeToLive > 0) {
+                expiration = timeToLive + timeStamp;
+            }
+        }
+
+        msg.setJMSExpiration(expiration);
+        msg.setJMSPriority(priority);
+
+        msg.setJMSRedelivered(false);
+        msg.setMessageId(messageId);
+
+        msg.onSend();
+
+        msg.setProducerId(msg.getMessageId().getProducerId());
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Sending message: " + msg);
+        }
+
+        if (async) {
+            asyncSendPacket(msg);
+        } else {
+            syncSendPacket(msg);
+        }
+
+    }
+
+    public void addOutputStream(ActiveMQOutputStream stream) {
+        outputStreams.add(stream);
+    }
+
+    public void removeOutputStream(ActiveMQOutputStream stream) {
+        outputStreams.remove(stream);
+    }
+
+    public void addInputStream(ActiveMQInputStream stream) {
+        inputStreams.add(stream);
+    }
+
+    public void removeInputStream(ActiveMQInputStream stream) {
+        inputStreams.remove(stream);
+    }
+
+    protected void onControlCommand(ControlCommand command) {
+        String text = command.getCommand();
+        if (text != null) {
+            if (text.equals("shutdown")) {
+                LOG.info("JVM told to shutdown");
+                System.exit(0);
+            }
+        }
+    }
+
+    protected void onConnectionControl(ConnectionControl command) {
+        if (command.isFaultTolerant()) {
+            this.optimizeAcknowledge = false;
+            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
+                ActiveMQSession s = i.next();
+                s.setOptimizeAcknowledge(false);
+            }
+        }
+    }
+
+    protected void onConsumerControl(ConsumerControl command) {
+        if (command.isClose()) {
+            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
+                ActiveMQSession s = i.next();
+                s.close(command.getConsumerId());
+            }
+        } else {
+            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
+                ActiveMQSession s = i.next();
+                s.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
+            }
+        }
+    }
+
+    protected void transportFailed(IOException error) {
+        transportFailed.set(true);
+        if (firstFailureError == null) {
+            firstFailureError = error;
+        }
+    }
+
+    /**
+     * Should a JMS message be copied to a new JMS Message object as part of the

[... 53 lines stripped ...]