You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2014/09/23 20:20:50 UTC

[26/27] Initial drop of donated AMQP Client Code.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
new file mode 100644
index 0000000..b4d03af
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -0,0 +1,1128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionConsumer;
+import javax.jms.ConnectionMetaData;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+import javax.net.ssl.SSLContext;
+
+import org.apache.qpid.jms.exceptions.JmsConnectionFailedException;
+import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.apache.qpid.jms.message.JmsMessage;
+import org.apache.qpid.jms.message.JmsMessageFactory;
+import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
+import org.apache.qpid.jms.meta.JmsConnectionId;
+import org.apache.qpid.jms.meta.JmsConnectionInfo;
+import org.apache.qpid.jms.meta.JmsConsumerId;
+import org.apache.qpid.jms.meta.JmsResource;
+import org.apache.qpid.jms.meta.JmsSessionId;
+import org.apache.qpid.jms.meta.JmsTransactionId;
+import org.apache.qpid.jms.provider.Provider;
+import org.apache.qpid.jms.provider.ProviderClosedException;
+import org.apache.qpid.jms.provider.ProviderFuture;
+import org.apache.qpid.jms.provider.ProviderListener;
+import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
+import org.apache.qpid.jms.util.IdGenerator;
+import org.apache.qpid.jms.util.ThreadPoolUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of a JMS Connection
+ */
+public class JmsConnection implements Connection, TopicConnection, QueueConnection, ProviderListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JmsConnection.class);
+
+    private JmsConnectionInfo connectionInfo;
+
+    private final IdGenerator clientIdGenerator;
+    private boolean clientIdSet;
+    private boolean sendAcksAsync;
+    private ExceptionListener exceptionListener;
+    private final List<JmsSession> sessions = new CopyOnWriteArrayList<JmsSession>();
+    private final Map<JmsConsumerId, JmsMessageDispatcher> dispatchers =
+        new ConcurrentHashMap<JmsConsumerId, JmsMessageDispatcher>();
+    private final AtomicBoolean connected = new AtomicBoolean();
+    private final AtomicBoolean closed = new AtomicBoolean();
+    private final AtomicBoolean closing = new AtomicBoolean();
+    private final AtomicBoolean started = new AtomicBoolean();
+    private final AtomicBoolean failed = new AtomicBoolean();
+    private final Object connectLock = new Object();
+    private IOException firstFailureError;
+    private JmsPrefetchPolicy prefetchPolicy = new JmsPrefetchPolicy();
+    private boolean messagePrioritySupported;
+
+    private final ThreadPoolExecutor executor;
+
+    private URI brokerURI;
+    private URI localURI;
+    private SSLContext sslContext;
+    private Provider provider;
+    private final Set<JmsConnectionListener> connectionListeners =
+        new CopyOnWriteArraySet<JmsConnectionListener>();
+    private final Map<JmsDestination, JmsDestination> tempDestinations =
+        new ConcurrentHashMap<JmsDestination, JmsDestination>();
+    private final AtomicLong sessionIdGenerator = new AtomicLong();
+    private final AtomicLong tempDestIdGenerator = new AtomicLong();
+    private final AtomicLong transactionIdGenerator = new AtomicLong();
+    private JmsMessageFactory messageFactory;
+
+    protected JmsConnection(String connectionId, Provider provider, IdGenerator clientIdGenerator) throws JMSException {
+
+        // This executor can be used for dispatching asynchronous tasks that might block or result
+        // in reentrant calls to this Connection that could block.  The thread in this executor
+        // will also serve as a means of preventing JVM shutdown should a client application
+        // not have it's own mechanism for doing so.
+        executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+            @Override
+            public Thread newThread(Runnable r) {
+                Thread thread = new Thread(r, "QpidJMS Connection Executor: ");
+                return thread;
+            }
+        });
+
+        this.provider = provider;
+        this.provider.setProviderListener(this);
+        try {
+            this.provider.start();
+        } catch (Exception e) {
+            throw JmsExceptionSupport.create(e);
+        }
+
+        this.clientIdGenerator = clientIdGenerator;
+        this.connectionInfo = new JmsConnectionInfo(new JmsConnectionId(connectionId));
+    }
+
+    /**
+     * @throws JMSException
+     * @see javax.jms.Connection#close()
+     */
+    @Override
+    public void close() throws JMSException {
+        boolean interrupted = Thread.interrupted();
+
+        try {
+
+            if (!closed.get() && !failed.get()) {
+                // do not fail if already closed as specified by the JMS specification.
+                doStop(false);
+            }
+
+            synchronized (this) {
+
+                if (closed.get()) {
+                    return;
+                }
+
+                closing.set(true);
+
+                for (JmsSession session : this.sessions) {
+                    session.shutdown();
+                }
+
+                this.sessions.clear();
+                this.tempDestinations.clear();
+
+                if (isConnected() && !failed.get()) {
+                    ProviderFuture request = new ProviderFuture();
+                    try {
+                        provider.destroy(connectionInfo, request);
+
+                        try {
+                            request.sync();
+                        } catch (Exception ex) {
+                            // TODO - Spec is a bit vague here, we don't fail if already closed but
+                            //        in this case we really aren't closed yet so there could be an
+                            //        argument that at this point an exception is still valid.
+                            if (ex.getCause() instanceof InterruptedException) {
+                                throw (InterruptedException) ex.getCause();
+                            }
+                            LOG.debug("Failed destroying Connection resource: {}", ex.getMessage());
+                        }
+                    } catch(ProviderClosedException pce) {
+                        LOG.debug("Ignoring provider closed exception during connection close");
+                    }
+                }
+
+                connected.set(false);
+                started.set(false);
+                closing.set(false);
+                closed.set(true);
+            }
+        } catch (Exception e) {
+            throw JmsExceptionSupport.create(e);
+        } finally {
+            try {
+                ThreadPoolUtils.shutdown(executor);
+            } catch (Throwable e) {
+                LOG.warn("Error shutting down thread pool: " + executor + ". This exception will be ignored.", e);
+            }
+
+            if (provider != null) {
+                provider.close();
+                provider = null;
+            }
+
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
+     * Called to free all Connection resources.
+     */
+    protected void shutdown() throws JMSException {
+
+        // TODO - Once ConnectionConsumer is added we must shutdown those as well.
+
+        for (JmsSession session : this.sessions) {
+            session.shutdown();
+        }
+
+        if (isConnected() && !failed.get() && !closing.get()) {
+            destroyResource(connectionInfo);
+        }
+
+        if (clientIdSet) {
+            connectionInfo.setClientId(null);
+            clientIdSet = false;
+        }
+
+        tempDestinations.clear();
+        started.set(false);
+        connected.set(false);
+    }
+
+    /**
+     * @param destination
+     * @param messageSelector
+     * @param sessionPool
+     * @param maxMessages
+     * @return ConnectionConsumer
+     * @throws JMSException
+     * @see javax.jms.Connection#createConnectionConsumer(javax.jms.Destination,
+     *      java.lang.String, javax.jms.ServerSessionPool, int)
+     */
+    @Override
+    public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
+                                                       ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+        checkClosedOrFailed();
+        connect();
+        throw new JMSException("Not supported");
+    }
+
+    /**
+     * @param topic
+     * @param subscriptionName
+     * @param messageSelector
+     * @param sessionPool
+     * @param maxMessages
+     * @return ConnectionConsumer
+     * @throws JMSException
+     *
+     * @see javax.jms.Connection#createDurableConnectionConsumer(javax.jms.Topic,
+     *      java.lang.String, java.lang.String, javax.jms.ServerSessionPool, int)
+     */
+    @Override
+    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName,
+                                                              String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+        checkClosedOrFailed();
+        connect();
+        throw new JMSException("Not supported");
+    }
+
+    /**
+     * @param transacted
+     * @param acknowledgeMode
+     * @return Session
+     * @throws JMSException
+     * @see javax.jms.Connection#createSession(boolean, int)
+     */
+    @Override
+    public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
+        checkClosedOrFailed();
+        connect();
+        int ackMode = getSessionAcknowledgeMode(transacted, acknowledgeMode);
+        JmsSession result = new JmsSession(this, getNextSessionId(), ackMode);
+        addSession(result);
+        if (started.get()) {
+            result.start();
+        }
+        return result;
+    }
+
+    /**
+     * @return clientId
+     * @see javax.jms.Connection#getClientID()
+     */
+    @Override
+    public String getClientID() throws JMSException {
+        checkClosedOrFailed();
+        return this.connectionInfo.getClientId();
+    }
+
+    /**
+     * @return connectionInfoData
+     * @see javax.jms.Connection#getMetaData()
+     */
+    @Override
+    public ConnectionMetaData getMetaData() throws JMSException {
+        checkClosedOrFailed();
+        return JmsConnectionMetaData.INSTANCE;
+    }
+
+    /**
+     * @param clientID
+     * @throws JMSException
+     * @see javax.jms.Connection#setClientID(java.lang.String)
+     */
+    @Override
+    public synchronized void setClientID(String clientID) throws JMSException {
+        checkClosedOrFailed();
+
+        if (this.clientIdSet) {
+            throw new IllegalStateException("The clientID has already been set");
+        }
+        if (clientID == null) {
+            throw new IllegalStateException("Cannot have a null clientID");
+        }
+        if (connected.get()) {
+            throw new IllegalStateException("Cannot set the client id once connected.");
+        }
+
+        this.connectionInfo.setClientId(clientID);
+        this.clientIdSet = true;
+
+        //We weren't connected if we got this far, we should now connect now to ensure the clientID is valid.
+        //TODO: determine if any resulting failure is only the result of the ClientID value, or other reasons such as auth.
+        connect();
+    }
+
+    /**
+     * @throws JMSException
+     * @see javax.jms.Connection#start()
+     */
+    @Override
+    public void start() throws JMSException {
+        checkClosedOrFailed();
+        connect();
+        if (this.started.compareAndSet(false, true)) {
+            try {
+                for (JmsSession s : this.sessions) {
+                    s.start();
+                }
+            } catch (Exception e) {
+                throw JmsExceptionSupport.create(e);
+            }
+        }
+    }
+
+    /**
+     * @throws JMSException
+     * @see javax.jms.Connection#stop()
+     */
+    @Override
+    public void stop() throws JMSException {
+        doStop(true);
+    }
+
+    /**
+     * @see #stop()
+     * @param checkClosed <tt>true</tt> to check for already closed and throw
+     *                    {@link java.lang.IllegalStateException} if already closed,
+     *                    <tt>false</tt> to skip this check
+     * @throws JMSException if the JMS provider fails to stop message delivery due to some internal error.
+     */
+    void doStop(boolean checkClosed) throws JMSException {
+        if (checkClosed) {
+            checkClosedOrFailed();
+        }
+        if (started.compareAndSet(true, false)) {
+            synchronized(sessions) {
+                for (JmsSession s : this.sessions) {
+                    s.stop();
+                }
+            }
+        }
+    }
+
+    /**
+     * @param topic
+     * @param messageSelector
+     * @param sessionPool
+     * @param maxMessages
+     * @return ConnectionConsumer
+     * @throws JMSException
+     * @see javax.jms.TopicConnection#createConnectionConsumer(javax.jms.Topic,
+     *      java.lang.String, javax.jms.ServerSessionPool, int)
+     */
+    @Override
+    public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector,
+                                                       ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+        checkClosedOrFailed();
+        connect();
+        return null;
+    }
+
+    /**
+     * @param transacted
+     * @param acknowledgeMode
+     * @return TopicSession
+     * @throws JMSException
+     * @see javax.jms.TopicConnection#createTopicSession(boolean, int)
+     */
+    @Override
+    public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
+        checkClosedOrFailed();
+        connect();
+        int ackMode = getSessionAcknowledgeMode(transacted, acknowledgeMode);
+        JmsTopicSession result = new JmsTopicSession(this, getNextSessionId(), ackMode);
+        addSession(result);
+        if (started.get()) {
+            result.start();
+        }
+        return result;
+    }
+
+    /**
+     * @param queue
+     * @param messageSelector
+     * @param sessionPool
+     * @param maxMessages
+     * @return ConnectionConsumer
+     * @throws JMSException
+     * @see javax.jms.QueueConnection#createConnectionConsumer(javax.jms.Queue,
+     *      java.lang.String, javax.jms.ServerSessionPool, int)
+     */
+    @Override
+    public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector,
+                                                       ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+        checkClosedOrFailed();
+        connect();
+        return null;
+    }
+
+    /**
+     * @param transacted
+     * @param acknowledgeMode
+     * @return QueueSession
+     * @throws JMSException
+     * @see javax.jms.QueueConnection#createQueueSession(boolean, int)
+     */
+    @Override
+    public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
+        checkClosedOrFailed();
+        connect();
+        int ackMode = getSessionAcknowledgeMode(transacted, acknowledgeMode);
+        JmsQueueSession result = new JmsQueueSession(this, getNextSessionId(), ackMode);
+        addSession(result);
+        if (started.get()) {
+            result.start();
+        }
+        return result;
+    }
+
+    /**
+     * @param ex
+     */
+    public void onException(Exception ex) {
+        onException(JmsExceptionSupport.create(ex));
+    }
+
+    /**
+     * @param ex
+     */
+    public void onException(JMSException ex) {
+        ExceptionListener l = this.exceptionListener;
+        if (l != null) {
+            l.onException(JmsExceptionSupport.create(ex));
+        }
+    }
+
+    protected int getSessionAcknowledgeMode(boolean transacted, int acknowledgeMode) throws JMSException {
+        int result = acknowledgeMode;
+        if (!transacted && acknowledgeMode == Session.SESSION_TRANSACTED) {
+            throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
+        }
+        if (transacted) {
+            result = Session.SESSION_TRANSACTED;
+        }
+        return result;
+    }
+
+    protected void removeSession(JmsSession session) throws JMSException {
+        this.sessions.remove(session);
+    }
+
+    protected void addSession(JmsSession s) {
+        this.sessions.add(s);
+    }
+
+    protected void addDispatcher(JmsConsumerId consumerId, JmsMessageDispatcher dispatcher) {
+        dispatchers.put(consumerId, dispatcher);
+    }
+
+    protected void removeDispatcher(JmsConsumerId consumerId) {
+        dispatchers.remove(consumerId);
+    }
+
+    private void connect() throws JMSException {
+        synchronized(this.connectLock) {
+            if (isConnected() || closed.get()) {
+                return;
+            }
+
+            if (connectionInfo.getClientId() == null || connectionInfo.getClientId().trim().isEmpty()) {
+                connectionInfo.setClientId(clientIdGenerator.generateId());
+            }
+
+            this.connectionInfo = createResource(connectionInfo);
+            this.connected.set(true);
+            this.messageFactory = provider.getMessageFactory();
+
+            // TODO - Advisory Support.
+            //
+            // Providers should have an interface for adding a listener for temporary
+            // destination advisory messages for create / destroy so we can track them
+            // and throw exceptions when producers try to send to deleted destinations.
+        }
+    }
+
+    /**
+     * @return a newly initialized TemporaryQueue instance.
+     */
+    protected TemporaryQueue createTemporaryQueue() throws JMSException {
+        String destinationName = connectionInfo.getConnectionId() + ":" + tempDestIdGenerator.incrementAndGet();
+        JmsTemporaryQueue queue = new JmsTemporaryQueue(destinationName);
+        queue = createResource(queue);
+        tempDestinations.put(queue, queue);
+        return queue;
+    }
+
+    /**
+     * @return a newly initialized TemporaryTopic instance.
+     */
+    protected TemporaryTopic createTemporaryTopic() throws JMSException {
+        String destinationName = connectionInfo.getConnectionId() + ":" + tempDestIdGenerator.incrementAndGet();
+        JmsTemporaryTopic topic = new JmsTemporaryTopic(destinationName);
+        topic = createResource(topic);
+        tempDestinations.put(topic, topic);
+        return topic;
+    }
+
+    protected void deleteDestination(JmsDestination destination) throws JMSException {
+        checkClosedOrFailed();
+        connect();
+
+        try {
+
+            for (JmsSession session : this.sessions) {
+                if (session.isDestinationInUse(destination)) {
+                    throw new JMSException("A consumer is consuming from the temporary destination");
+                }
+            }
+
+            if (destination.isTemporary()) {
+                tempDestinations.remove(destination);
+            }
+
+            destroyResource(destination);
+        } catch (Exception e) {
+            throw JmsExceptionSupport.create(e);
+        }
+    }
+
+    protected void checkClosedOrFailed() throws JMSException {
+        checkClosed();
+        if (failed.get()) {
+            throw new JmsConnectionFailedException(firstFailureError);
+        }
+    }
+
+    protected void checkClosed() throws IllegalStateException {
+        if (this.closed.get()) {
+            throw new IllegalStateException("The Connection is closed");
+        }
+    }
+
+    protected JmsSessionId getNextSessionId() {
+        return new JmsSessionId(connectionInfo.getConnectionId(), sessionIdGenerator.incrementAndGet());
+    }
+
+    protected JmsTransactionId getNextTransactionId() {
+        return new JmsTransactionId(connectionInfo.getConnectionId(), transactionIdGenerator.incrementAndGet());
+    }
+
+    ////////////////////////////////////////////////////////////////////////////
+    // Provider interface methods
+    ////////////////////////////////////////////////////////////////////////////
+
+    <T extends JmsResource> T createResource(T resource) throws JMSException {
+        checkClosedOrFailed();
+
+        try {
+            ProviderFuture request = new ProviderFuture();
+            provider.create(resource, request);
+            request.sync();
+            return resource;
+        } catch (Exception ex) {
+            throw JmsExceptionSupport.create(ex);
+        }
+    }
+
+    void startResource(JmsResource resource) throws JMSException {
+        connect();
+
+        try {
+            ProviderFuture request = new ProviderFuture();
+            provider.start(resource, request);
+            request.sync();
+        } catch (Exception ioe) {
+            throw JmsExceptionSupport.create(ioe);
+        }
+    }
+
+    void destroyResource(JmsResource resource) throws JMSException {
+        connect();
+
+        try {
+            ProviderFuture request = new ProviderFuture();
+            provider.destroy(resource, request);
+            request.sync();
+        } catch (Exception ioe) {
+            throw JmsExceptionSupport.create(ioe);
+        }
+    }
+
+    void send(JmsOutboundMessageDispatch envelope) throws JMSException {
+        checkClosedOrFailed();
+        connect();
+
+        // TODO - We don't currently have a way to say that an operation
+        //        should be done asynchronously.  A send can be done async
+        //        in many cases, such as non-persistent delivery.  We probably
+        //        don't need to do anything here though just have a way to
+        //        configure the provider for async sends which we do in the
+        //        JmsConnectionInfo.  Here we just need to register a listener
+        //        on the request to know when it completes if we want to do
+        //        JMS 2.0 style async sends where we signal a callback, then
+        //        we can manage order of callback events to async senders at
+        //        this level.
+        try {
+            ProviderFuture request = new ProviderFuture();
+            provider.send(envelope, request);
+            request.sync();
+        } catch (Exception ioe) {
+            throw JmsExceptionSupport.create(ioe);
+        }
+    }
+
+    void acknowledge(JmsInboundMessageDispatch envelope, ACK_TYPE ackType) throws JMSException {
+        checkClosedOrFailed();
+        connect();
+
+        try {
+            ProviderFuture request = new ProviderFuture();
+            provider.acknowledge(envelope, ackType, request);
+            request.sync();
+        } catch (Exception ioe) {
+            throw JmsExceptionSupport.create(ioe);
+        }
+    }
+
+    void acknowledge(JmsSessionId sessionId) throws JMSException {
+        checkClosedOrFailed();
+        connect();
+
+        try {
+            ProviderFuture request = new ProviderFuture();
+            provider.acknowledge(sessionId, request);
+            request.sync();
+        } catch (Exception ioe) {
+            throw JmsExceptionSupport.create(ioe);
+        }
+    }
+
+    void unsubscribe(String name) throws JMSException {
+        checkClosedOrFailed();
+        connect();
+
+        try {
+            ProviderFuture request = new ProviderFuture();
+            provider.unsubscribe(name, request);
+            request.sync();
+        } catch (Exception ioe) {
+            throw JmsExceptionSupport.create(ioe);
+        }
+    }
+
+    void commit(JmsSessionId sessionId) throws JMSException {
+        checkClosedOrFailed();
+        connect();
+
+        try {
+            ProviderFuture request = new ProviderFuture();
+            provider.commit(sessionId, request);
+            request.sync();
+        } catch (Exception ioe) {
+            throw JmsExceptionSupport.create(ioe);
+        }
+    }
+
+    void rollback(JmsSessionId sessionId) throws JMSException {
+        checkClosedOrFailed();
+        connect();
+
+        try {
+            ProviderFuture request = new ProviderFuture();
+            provider.rollback(sessionId, request);
+            request.sync();
+        } catch (Exception ioe) {
+            throw JmsExceptionSupport.create(ioe);
+        }
+    }
+
+    void recover(JmsSessionId sessionId) throws JMSException {
+        checkClosedOrFailed();
+        connect();
+
+        try {
+            ProviderFuture request = new ProviderFuture();
+            provider.recover(sessionId, request);
+            request.sync();
+        } catch (Exception ioe) {
+            throw JmsExceptionSupport.create(ioe);
+        }
+    }
+
+    void pull(JmsConsumerId consumerId, long timeout) throws JMSException {
+        checkClosedOrFailed();
+        connect();
+
+        try {
+            ProviderFuture request = new ProviderFuture();
+            provider.pull(consumerId, timeout, request);
+            request.sync();
+        } catch (Exception ioe) {
+            throw JmsExceptionSupport.create(ioe);
+        }
+    }
+
+    ////////////////////////////////////////////////////////////////////////////
+    // Property setters and getters
+    ////////////////////////////////////////////////////////////////////////////
+
+    /**
+     * @return ExceptionListener
+     * @see javax.jms.Connection#getExceptionListener()
+     */
+    @Override
+    public ExceptionListener getExceptionListener() throws JMSException {
+        checkClosedOrFailed();
+        return this.exceptionListener;
+    }
+
+    /**
+     * @param listener
+     * @see javax.jms.Connection#setExceptionListener(javax.jms.ExceptionListener)
+     */
+    @Override
+    public void setExceptionListener(ExceptionListener listener) throws JMSException {
+        checkClosedOrFailed();
+        this.exceptionListener = listener;
+    }
+
+    /**
+     * Adds a JmsConnectionListener so that a client can be notified of events in
+     * the underlying protocol provider.
+     *
+     * @param listener
+     *        the new listener to add to the collection.
+     */
+    public void addConnectionListener(JmsConnectionListener listener) {
+        this.connectionListeners.add(listener);
+    }
+
+    /**
+     * Removes a JmsConnectionListener that was previously registered.
+     *
+     * @param listener
+     *        the listener to remove from the collection.
+     */
+    public void removeTransportListener(JmsConnectionListener listener) {
+        this.connectionListeners.remove(listener);
+    }
+
+    public boolean isForceAsyncSend() {
+        return connectionInfo.isForceAsyncSend();
+    }
+
+    public void setForceAsyncSend(boolean forceAsyncSend) {
+        connectionInfo.setForceAsyncSends(forceAsyncSend);
+    }
+
+    public boolean isAlwaysSyncSend() {
+        return connectionInfo.isAlwaysSyncSend();
+    }
+
+    public void setAlwaysSyncSend(boolean alwaysSyncSend) {
+        this.connectionInfo.setAlwaysSyncSend(alwaysSyncSend);
+    }
+
+    public String getTopicPrefix() {
+        return connectionInfo.getTopicPrefix();
+    }
+
+    public void setTopicPrefix(String topicPrefix) {
+        connectionInfo.setTopicPrefix(topicPrefix);
+    }
+
+    public String getTempTopicPrefix() {
+        return connectionInfo.getTempTopicPrefix();
+    }
+
+    public void setTempTopicPrefix(String tempTopicPrefix) {
+        connectionInfo.setTempTopicPrefix(tempTopicPrefix);
+    }
+
+    public String getTempQueuePrefix() {
+        return connectionInfo.getTempQueuePrefix();
+    }
+
+    public void setTempQueuePrefix(String tempQueuePrefix) {
+        connectionInfo.setTempQueuePrefix(tempQueuePrefix);
+    }
+
+    public String getQueuePrefix() {
+        return connectionInfo.getQueuePrefix();
+    }
+
+    public void setQueuePrefix(String queuePrefix) {
+        connectionInfo.setQueuePrefix(queuePrefix);
+    }
+
+    public boolean isOmitHost() {
+        return connectionInfo.isOmitHost();
+    }
+
+    public void setOmitHost(boolean omitHost) {
+        connectionInfo.setOmitHost(omitHost);
+    }
+
+    public JmsPrefetchPolicy getPrefetchPolicy() {
+        return prefetchPolicy;
+    }
+
+    public void setPrefetchPolicy(JmsPrefetchPolicy prefetchPolicy) {
+        this.prefetchPolicy = prefetchPolicy;
+    }
+
+    public boolean isMessagePrioritySupported() {
+        return messagePrioritySupported;
+    }
+
+    public void setMessagePrioritySupported(boolean messagePrioritySupported) {
+        this.messagePrioritySupported = messagePrioritySupported;
+    }
+
+    public long getCloseTimeout() {
+        return connectionInfo.getCloseTimeout();
+    }
+
+    public void setCloseTimeout(long closeTimeout) {
+        connectionInfo.setCloseTimeout(closeTimeout);
+    }
+
+    public long getConnectTimeout() {
+        return this.connectionInfo.getConnectTimeout();
+    }
+
+    public void setConnectTimeout(long connectTimeout) {
+        this.connectionInfo.setConnectTimeout(connectTimeout);
+    }
+
+    public long getSendTimeout() {
+        return connectionInfo.getSendTimeout();
+    }
+
+    public void setSendTimeout(long sendTimeout) {
+        connectionInfo.setSendTimeout(sendTimeout);
+    }
+
+    public long getRequestTimeout() {
+        return connectionInfo.getRequestTimeout();
+    }
+
+    public void setRequestTimeout(long requestTimeout) {
+        connectionInfo.setRequestTimeout(requestTimeout);
+    }
+
+    public URI getBrokerURI() {
+        return brokerURI;
+    }
+
+    public void setBrokerURI(URI brokerURI) {
+        this.brokerURI = brokerURI;
+    }
+
+    public URI getLocalURI() {
+        return localURI;
+    }
+
+    public void setLocalURI(URI localURI) {
+        this.localURI = localURI;
+    }
+
+    public SSLContext getSslContext() {
+        return sslContext;
+    }
+
+    public void setSslContext(SSLContext sslContext) {
+        this.sslContext = sslContext;
+    }
+
+    public String getUsername() {
+        return this.connectionInfo.getUsername();
+    }
+
+    public void setUsername(String username) {
+        this.connectionInfo.setUsername(username);;
+    }
+
+    public String getPassword() {
+        return this.connectionInfo.getPassword();
+    }
+
+    public void setPassword(String password) {
+        this.connectionInfo.setPassword(password);
+    }
+
+    public Provider getProvider() {
+        return provider;
+    }
+
+    void setProvider(Provider provider) {
+        this.provider = provider;
+    }
+
+    public boolean isConnected() {
+        return this.connected.get();
+    }
+
+    public boolean isStarted() {
+        return this.started.get();
+    }
+
+    public boolean isClosed() {
+        return this.closed.get();
+    }
+
+    JmsConnectionId getConnectionId() {
+        return this.connectionInfo.getConnectionId();
+    }
+
+    public boolean isWatchRemoteDestinations() {
+        return this.connectionInfo.isWatchRemoteDestinations();
+    }
+
+    public void setWatchRemoteDestinations(boolean watchRemoteDestinations) {
+        this.connectionInfo.setWatchRemoteDestinations(watchRemoteDestinations);
+    }
+
+    public JmsMessageFactory getMessageFactory() {
+        return messageFactory;
+    }
+
+    public boolean isSendAcksAsync() {
+        return sendAcksAsync;
+    }
+
+    public void setSendAcksAsync(boolean sendAcksAsync) {
+        this.sendAcksAsync = sendAcksAsync;
+    }
+
+    @Override
+    public void onMessage(JmsInboundMessageDispatch envelope) {
+
+        JmsMessage incoming = envelope.getMessage();
+        // Ensure incoming Messages are in readonly mode.
+        if (incoming != null) {
+            incoming.setReadOnlyBody(true);
+            incoming.setReadOnlyProperties(true);
+        }
+
+        JmsMessageDispatcher dispatcher = dispatchers.get(envelope.getConsumerId());
+        if (dispatcher != null) {
+            dispatcher.onMessage(envelope);
+        }
+        for (JmsConnectionListener listener : connectionListeners) {
+            listener.onMessage(envelope);
+        }
+    }
+
+    @Override
+    public void onConnectionInterrupted(URI remoteURI) {
+        for (JmsSession session : sessions) {
+            session.onConnectionInterrupted();
+        }
+
+        for (JmsConnectionListener listener : connectionListeners) {
+            listener.onConnectionInterrupted(remoteURI);
+        }
+    }
+
+    @Override
+    public void onConnectionRecovery(Provider provider) throws Exception {
+        // TODO - Recover Advisory Consumer once we can support it.
+
+        LOG.debug("Connection {} is starting recovery.", connectionInfo.getConnectionId());
+
+        ProviderFuture request = new ProviderFuture();
+        provider.create(connectionInfo, request);
+        request.sync();
+
+        for (JmsDestination tempDestination : tempDestinations.values()) {
+            createResource(tempDestination);
+        }
+
+        for (JmsSession session : sessions) {
+            session.onConnectionRecovery(provider);
+        }
+    }
+
+    @Override
+    public void onConnectionRecovered(Provider provider) throws Exception {
+        LOG.debug("Connection {} is finalizing recovery.", connectionInfo.getConnectionId());
+
+        this.messageFactory = provider.getMessageFactory();
+
+        for (JmsSession session : sessions) {
+            session.onConnectionRecovered(provider);
+        }
+    }
+
+    @Override
+    public void onConnectionRestored(URI remoteURI) {
+        for (JmsSession session : sessions) {
+            session.onConnectionRestored();
+        }
+
+        for (JmsConnectionListener listener : connectionListeners) {
+            listener.onConnectionRestored(remoteURI);
+        }
+    }
+
+    @Override
+    public void onConnectionFailure(final IOException ex) {
+        onAsyncException(ex);
+        if (!closing.get() && !closed.get()) {
+            executor.execute(new Runnable() {
+                @Override
+                public void run() {
+                    providerFailed(ex);
+                    if (provider != null) {
+                        try {
+                            provider.close();
+                        } catch (Throwable error) {
+                            LOG.debug("Error while closing failed Provider: {}", error.getMessage());
+                        }
+                    }
+
+                    try {
+                        shutdown();
+                    } catch (JMSException e) {
+                        LOG.warn("Exception during connection cleanup, " + e, e);
+                    }
+
+                    for (JmsConnectionListener listener : connectionListeners) {
+                        listener.onConnectionFailure(ex);
+                    }
+                }
+            });
+        }
+    }
+
+    /**
+     * Handles any asynchronous errors that occur from the JMS framework classes.
+     *
+     * If any listeners are registered they will be notified of the error from a thread
+     * in the Connection's Executor service.
+     *
+     * @param error
+     *        The exception that triggered this error.
+     */
+    public void onAsyncException(Throwable error) {
+        if (!closed.get() && !closing.get()) {
+            if (this.exceptionListener != null) {
+
+                if (!(error instanceof JMSException)) {
+                    error = JmsExceptionSupport.create(error);
+                }
+                final JMSException jmsError = (JMSException)error;
+
+                executor.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        JmsConnection.this.exceptionListener.onException(jmsError);
+                    }
+                });
+            } else {
+                LOG.debug("Async exception with no exception listener: " + error, error);
+            }
+        }
+    }
+
+    protected void providerFailed(IOException error) {
+        failed.set(true);
+        if (firstFailureError == null) {
+            firstFailureError = error;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
new file mode 100644
index 0000000..1333a5e
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
@@ -0,0 +1,664 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+
+import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
+import org.apache.qpid.jms.jndi.JNDIStorable;
+import org.apache.qpid.jms.meta.JmsConnectionInfo;
+import org.apache.qpid.jms.provider.Provider;
+import org.apache.qpid.jms.provider.ProviderFactory;
+import org.apache.qpid.jms.util.IdGenerator;
+import org.apache.qpid.jms.util.PropertyUtil;
+import org.apache.qpid.jms.util.URISupport;
+import org.apache.qpid.jms.util.URISupport.CompositeData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JMS ConnectionFactory Implementation.
+ */
+public class JmsConnectionFactory extends JNDIStorable implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JmsConnectionFactory.class);
+
+    private URI brokerURI;
+    private URI localURI;
+    private String username;
+    private String password;
+    private boolean forceAsyncSend;
+    private boolean alwaysSyncSend;
+    private boolean sendAcksAsync;
+    private boolean omitHost;
+    private boolean messagePrioritySupported = true;
+    private String queuePrefix = "queue://";
+    private String topicPrefix = "topic://";
+    private String tempQueuePrefix = "temp-queue://";
+    private String tempTopicPrefix = "temp-topic://";
+    private long sendTimeout = JmsConnectionInfo.DEFAULT_SEND_TIMEOUT;
+    private long requestTimeout = JmsConnectionInfo.DEFAULT_REQUEST_TIMEOUT;
+    private long closeTimeout = JmsConnectionInfo.DEFAULT_CLOSE_TIMEOUT;
+    private long connectTimeout = JmsConnectionInfo.DEFAULT_CONNECT_TIMEOUT;
+    private boolean watchRemoteDestinations = true;
+    private IdGenerator clientIdGenerator;
+    private String clientIDPrefix;
+    private IdGenerator connectionIdGenerator;
+    private String connectionIDPrefix;
+    private ExceptionListener exceptionListener;
+
+    private JmsPrefetchPolicy prefetchPolicy = new JmsPrefetchPolicy();
+
+    public JmsConnectionFactory() {
+    }
+
+    public JmsConnectionFactory(String username, String password) {
+        setUsername(username);
+        setPassword(password);
+    }
+
+    public JmsConnectionFactory(String brokerURI) {
+        this(createURI(brokerURI));
+    }
+
+    public JmsConnectionFactory(URI brokerURI) {
+        setBrokerURI(brokerURI.toString());
+    }
+
+    public JmsConnectionFactory(String userName, String password, URI brokerURI) {
+        setUsername(userName);
+        setPassword(password);
+        setBrokerURI(brokerURI.toString());
+    }
+
+    public JmsConnectionFactory(String userName, String password, String brokerURI) {
+        setUsername(userName);
+        setPassword(password);
+        setBrokerURI(brokerURI);
+    }
+
+    /**
+     * Set properties
+     *
+     * @param props
+     */
+    public void setProperties(Properties props) {
+        Map<String, String> map = new HashMap<String, String>();
+        for (Map.Entry<Object, Object> entry : props.entrySet()) {
+            map.put(entry.getKey().toString(), entry.getValue().toString());
+        }
+        setProperties(map);
+    }
+
+    @Override
+    public void setProperties(Map<String, String> map) {
+        buildFromProperties(map);
+    }
+
+    /**
+     * @param map
+     */
+    @Override
+    protected void buildFromProperties(Map<String, String> map) {
+        PropertyUtil.setProperties(this, map);
+    }
+
+    /**
+     * @param map
+     */
+    @Override
+    protected void populateProperties(Map<String, String> map) {
+        try {
+            Map<String, String> result = PropertyUtil.getProperties(this);
+            map.putAll(result);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * @return a TopicConnection
+     * @throws JMSException
+     * @see javax.jms.TopicConnectionFactory#createTopicConnection()
+     */
+    @Override
+    public TopicConnection createTopicConnection() throws JMSException {
+        return createTopicConnection(getUsername(), getPassword());
+    }
+
+    /**
+     * @param userName
+     * @param password
+     * @return a TopicConnection
+     * @throws JMSException
+     * @see javax.jms.TopicConnectionFactory#createTopicConnection(java.lang.String,
+     *      java.lang.String)
+     */
+    @Override
+    public TopicConnection createTopicConnection(String username, String password) throws JMSException {
+        try {
+            String connectionId = getConnectionIdGenerator().generateId();
+            Provider provider = createProvider(brokerURI);
+            JmsTopicConnection result = new JmsTopicConnection(connectionId, provider, getClientIdGenerator());
+            return configureConnection(result, username, password);
+        } catch (Exception e) {
+            throw JmsExceptionSupport.create(e);
+        }
+    }
+
+    /**
+     * @return a Connection
+     * @throws JMSException
+     * @see javax.jms.ConnectionFactory#createConnection()
+     */
+    @Override
+    public Connection createConnection() throws JMSException {
+        return createConnection(getUsername(), getPassword());
+    }
+
+    /**
+     * @param userName
+     * @param password
+     * @return Connection
+     * @throws JMSException
+     * @see javax.jms.ConnectionFactory#createConnection(java.lang.String, java.lang.String)
+     */
+    @Override
+    public Connection createConnection(String username, String password) throws JMSException {
+        try {
+            String connectionId = getConnectionIdGenerator().generateId();
+            Provider provider = createProvider(brokerURI);
+            JmsConnection result = new JmsConnection(connectionId, provider, getClientIdGenerator());
+            return configureConnection(result, username, password);
+        } catch (Exception e) {
+            throw JmsExceptionSupport.create(e);
+        }
+    }
+
+    /**
+     * @return a QueueConnection
+     * @throws JMSException
+     * @see javax.jms.QueueConnectionFactory#createQueueConnection()
+     */
+    @Override
+    public QueueConnection createQueueConnection() throws JMSException {
+        return createQueueConnection(getUsername(), getPassword());
+    }
+
+    /**
+     * @param userName
+     * @param password
+     * @return a QueueConnection
+     * @throws JMSException
+     * @see javax.jms.QueueConnectionFactory#createQueueConnection(java.lang.String,
+     *      java.lang.String)
+     */
+    @Override
+    public QueueConnection createQueueConnection(String username, String password) throws JMSException {
+        try {
+            String connectionId = getConnectionIdGenerator().generateId();
+            Provider provider = createProvider(brokerURI);
+            JmsQueueConnection result = new JmsQueueConnection(connectionId, provider, getClientIdGenerator());
+            return configureConnection(result, username, password);
+        } catch (Exception e) {
+            throw JmsExceptionSupport.create(e);
+        }
+    }
+
+    protected <T extends JmsConnection> T configureConnection(T connection, String username, String password) throws JMSException {
+        try {
+            PropertyUtil.setProperties(connection, PropertyUtil.getProperties(this));
+            connection.setExceptionListener(exceptionListener);
+            connection.setUsername(username);
+            connection.setPassword(password);
+            connection.setBrokerURI(brokerURI);
+            return connection;
+        } catch (Exception e) {
+            throw JmsExceptionSupport.create(e);
+        }
+    }
+
+    protected Provider createProvider(URI brokerURI) throws Exception {
+        Provider result = null;
+
+        try {
+            result = ProviderFactory.createAsync(brokerURI);
+        } catch (Exception ex) {
+            LOG.error("Failed to create JMS Provider instance for: {}", brokerURI.getScheme());
+            LOG.trace("Error: ", ex);
+            throw ex;
+        }
+
+        return result;
+    }
+
+    protected static URI createURI(String name) {
+        if (name != null && name.trim().isEmpty() == false) {
+            try {
+                return new URI(name);
+            } catch (URISyntaxException e) {
+                throw (IllegalArgumentException) new IllegalArgumentException("Invalid broker URI: " + name).initCause(e);
+            }
+        }
+        return null;
+    }
+
+    protected synchronized IdGenerator getConnectionIdGenerator() {
+        if (connectionIdGenerator == null) {
+            if (connectionIDPrefix != null) {
+                connectionIdGenerator = new IdGenerator(connectionIDPrefix);
+            } else {
+                connectionIdGenerator = new IdGenerator();
+            }
+        }
+        return connectionIdGenerator;
+    }
+
+    protected void setConnectionIdGenerator(IdGenerator connectionIdGenerator) {
+        this.connectionIdGenerator = connectionIdGenerator;
+    }
+
+    //////////////////////////////////////////////////////////////////////////
+    // Property getters and setters
+    //////////////////////////////////////////////////////////////////////////
+
+    /**
+     * @return the brokerURI
+     */
+    public String getBrokerURI() {
+        return this.brokerURI != null ? this.brokerURI.toString() : "";
+    }
+
+    /**
+     * @param brokerURI
+     *        the brokerURI to set
+     */
+    public void setBrokerURI(String brokerURI) {
+        if (brokerURI == null) {
+            throw new IllegalArgumentException("brokerURI cannot be null");
+        }
+        this.brokerURI = createURI(brokerURI);
+
+        try {
+            if (this.brokerURI.getQuery() != null) {
+                Map<String, String> map = PropertyUtil.parseQuery(this.brokerURI.getQuery());
+                Map<String, String> jmsOptionsMap = PropertyUtil.filterProperties(map, "jms.");
+
+                if (!PropertyUtil.setProperties(this, jmsOptionsMap)) {
+                    String msg = ""
+                        + " Not all jms options could be set on the ConnectionFactory."
+                        + " Check the options are spelled correctly."
+                        + " Given parameters=[" + jmsOptionsMap + "]."
+                        + " This connection factory cannot be started.";
+                    throw new IllegalArgumentException(msg);
+                } else {
+                    this.brokerURI = PropertyUtil.replaceQuery(this.brokerURI, map);
+                }
+            } else if (URISupport.isCompositeURI(this.brokerURI)) {
+                CompositeData data = URISupport.parseComposite(this.brokerURI);
+                Map<String, String> jmsOptionsMap = PropertyUtil.filterProperties(data.getParameters(), "jms.");
+                if (!PropertyUtil.setProperties(this, jmsOptionsMap)) {
+                    String msg = ""
+                        + " Not all jms options could be set on the ConnectionFactory."
+                        + " Check the options are spelled correctly."
+                        + " Given parameters=[" + jmsOptionsMap + "]."
+                        + " This connection factory cannot be started.";
+                    throw new IllegalArgumentException(msg);
+                } else {
+                    this.brokerURI = data.toURI();
+                }
+            }
+        } catch (Exception e) {
+            throw new IllegalArgumentException(e.getMessage());
+        }
+    }
+
+    /**
+     * @return the localURI
+     */
+    public String getLocalURI() {
+        return this.localURI != null ? this.localURI.toString() : "";
+    }
+
+    /**
+     * @param localURI
+     *        the localURI to set
+     */
+    public void setLocalURI(String localURI) {
+        this.localURI = createURI(localURI);
+    }
+
+    /**
+     * @return the username
+     */
+    public String getUsername() {
+        return this.username;
+    }
+
+    /**
+     * @param username
+     *        the username to set
+     */
+    public void setUsername(String username) {
+        this.username = username;
+    }
+
+    /**
+     * @return the password
+     */
+    public String getPassword() {
+        return this.password;
+    }
+
+    /**
+     * @param password
+     *        the password to set
+     */
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public boolean isForceAsyncSend() {
+        return forceAsyncSend;
+    }
+
+    public void setForceAsyncSend(boolean forceAsyncSend) {
+        this.forceAsyncSend = forceAsyncSend;
+    }
+
+    public boolean isOmitHost() {
+        return omitHost;
+    }
+
+    public void setOmitHost(boolean omitHost) {
+        this.omitHost = omitHost;
+    }
+
+    /**
+     * @return the messagePrioritySupported configuration option.
+     */
+    public boolean isMessagePrioritySupported() {
+        return this.messagePrioritySupported;
+    }
+
+    /**
+     * Enables message priority support in MessageConsumer instances.  This results
+     * in all prefetched messages being dispatched in priority order.
+     *
+     * @param messagePrioritySupported the messagePrioritySupported to set
+     */
+    public void setMessagePrioritySupported(boolean messagePrioritySupported) {
+        this.messagePrioritySupported = messagePrioritySupported;
+    }
+
+    /**
+     * Returns the prefix applied to Queues that are created by the client.
+     *
+     * @return the currently configured Queue prefix.
+     */
+    public String getQueuePrefix() {
+        return queuePrefix;
+    }
+
+    public void setQueuePrefix(String queuePrefix) {
+        this.queuePrefix = queuePrefix;
+    }
+
+    /**
+     * Returns the prefix applied to Temporary Queues that are created by the client.
+     *
+     * @return the currently configured Temporary Queue prefix.
+     */
+    public String getTempQueuePrefix() {
+        return tempQueuePrefix;
+    }
+
+    public void setTempQueuePrefix(String tempQueuePrefix) {
+        this.tempQueuePrefix = tempQueuePrefix;
+    }
+
+    /**
+     * Returns the prefix applied to Temporary Topics that are created by the client.
+     *
+     * @return the currently configured Temporary Topic prefix.
+     */
+    public String getTempTopicPrefix() {
+        return tempTopicPrefix;
+    }
+
+    public void setTempTopicPrefix(String tempTopicPrefix) {
+        this.tempTopicPrefix = tempTopicPrefix;
+    }
+
+    /**
+     * Returns the prefix applied to Topics that are created by the client.
+     *
+     * @return the currently configured Topic prefix.
+     */
+    public String getTopicPrefix() {
+        return topicPrefix;
+    }
+
+    public void setTopicPrefix(String topicPrefix) {
+        this.topicPrefix = topicPrefix;
+    }
+
+    /**
+     * Gets the currently set close timeout.
+     *
+     * @return the currently set close timeout.
+     */
+    public long getCloseTimeout() {
+        return closeTimeout;
+    }
+
+    /**
+     * Sets the close timeout used to control how long a Connection close will wait for
+     * clean shutdown of the connection before giving up.  A negative value means wait
+     * forever.
+     *
+     * Care should be taken in that a very short close timeout can cause the client to
+     * not cleanly shutdown the connection and it's resources.
+     *
+     * @param closeTimeout
+     *        time in milliseconds to wait for a clean connection close.
+     */
+    public void setCloseTimeout(long closeTimeout) {
+        this.closeTimeout = closeTimeout;
+    }
+
+    /**
+     * Returns the currently configured wire level connect timeout.
+     *
+     * @return the currently configured wire level connect timeout.
+     */
+    public long getConnectTimeout() {
+        return this.connectTimeout;
+    }
+
+    /**
+     * Sets the timeout value used to control how long a client will wait for a successful
+     * connection to the remote peer to be established before considering the attempt to
+     * have failed.  This value does not control socket level connection timeout but rather
+     * connection handshake at the wire level, to control the socket level timeouts use the
+     * standard socket options configuration values.
+     *
+     * @param connectTimeout
+     *        the time in milliseconds to wait for the protocol connection handshake to complete.
+     */
+    public void setConnectTimeout(long connectTimeout) {
+        this.connectTimeout = connectTimeout;
+    }
+
+    public long getSendTimeout() {
+        return sendTimeout;
+    }
+
+    public void setSendTimeout(long sendTimeout) {
+        this.sendTimeout = sendTimeout;
+    }
+
+    public long getRequestTimeout() {
+        return requestTimeout;
+    }
+
+    public void setRequestTimeout(long requestTimeout) {
+        this.requestTimeout = requestTimeout;
+    }
+
+    public JmsPrefetchPolicy getPrefetchPolicy() {
+        return prefetchPolicy;
+    }
+
+    public void setPrefetchPolicy(JmsPrefetchPolicy prefetchPolicy) {
+        this.prefetchPolicy = prefetchPolicy;
+    }
+
+    public String getClientIDPrefix() {
+        return clientIDPrefix;
+    }
+
+    /**
+     * Sets the prefix used by auto-generated JMS Client ID values which are used if the JMS
+     * client does not explicitly specify on.
+     *
+     * @param clientIDPrefix
+     */
+    public void setClientIDPrefix(String clientIDPrefix) {
+        this.clientIDPrefix = clientIDPrefix;
+    }
+
+    protected synchronized IdGenerator getClientIdGenerator() {
+        if (clientIdGenerator == null) {
+            if (clientIDPrefix != null) {
+                clientIdGenerator = new IdGenerator(clientIDPrefix);
+            } else {
+                clientIdGenerator = new IdGenerator();
+            }
+        }
+        return clientIdGenerator;
+    }
+
+    protected void setClientIdGenerator(IdGenerator clientIdGenerator) {
+        this.clientIdGenerator = clientIdGenerator;
+    }
+
+    /**
+     * Sets the prefix used by connection id generator.
+     *
+     * @param connectionIDPrefix
+     *        The string prefix used on all connection Id's created by this factory.
+     */
+    public void setConnectionIDPrefix(String connectionIDPrefix) {
+        this.connectionIDPrefix = connectionIDPrefix;
+    }
+
+    /**
+     * Gets the currently configured JMS ExceptionListener that will be set on all
+     * new Connection objects created from this factory.
+     *
+     * @return the currently configured JMS ExceptionListener.
+     */
+    public ExceptionListener getExceptionListener() {
+        return exceptionListener;
+    }
+
+    /**
+     * Sets the JMS ExceptionListener that will be set on all new Connection objects
+     * created from this factory.
+     *
+     * @param exceptionListener
+     *        the JMS ExceptionListenenr to apply to new Connection's or null to clear.
+     */
+    public void setExceptionListener(ExceptionListener exceptionListener) {
+        this.exceptionListener = exceptionListener;
+    }
+
+    /**
+     * Indicates if the Connection's created from this factory will watch for updates
+     * from the remote peer informing of temporary destination creation and destruction.
+     *
+     * @return true if destination monitoring is enabled.
+     */
+    public boolean isWatchRemoteDestinations() {
+        return watchRemoteDestinations;
+    }
+
+    /**
+     * Enable or disable monitoring of remote temporary destination life-cycles.
+     *
+     * @param watchRemoteDestinations
+     *        true if connection instances should monitor remote destination life-cycles.
+     */
+    public void setWatchRemoteDestinations(boolean watchRemoteDestinations) {
+        this.watchRemoteDestinations = watchRemoteDestinations;
+    }
+
+    /**
+     * Returns true if the client should always send messages using a synchronous
+     * send operation regardless of persistence mode, or inside a transaction.
+     *
+     * @return true if sends should always be done synchronously.
+     */
+    public boolean isAlwaysSyncSend() {
+        return alwaysSyncSend;
+    }
+
+    /**
+     * Configures whether or not the client will always send messages synchronously or not
+     * regardless of other factors that might result in an asynchronous send.
+     *
+     * @param alwaysSyncSend
+     *        if true sends are always done synchronously.
+     */
+    public void setAlwaysSyncSend(boolean alwaysSyncSend) {
+        this.alwaysSyncSend = alwaysSyncSend;
+    }
+
+    /**
+     * @return true if consumer acknowledgments are sent asynchronously or not.
+     */
+    public boolean isSendAcksAsync() {
+        return sendAcksAsync;
+    }
+
+    /**
+     * Should the message acknowledgments from a consumer be sent synchronously or
+     * asynchronously.  Sending the acknowledgments asynchronously can increase the
+     * performance of a consumer but opens up the possibility of a missed message
+     * acknowledge should the connection be unstable.
+     *
+     * @param sendAcksAsync
+     *        true to have the client send all message acknowledgments asynchronously.
+     */
+    public void setSendAcksAsync(boolean sendAcksAsync) {
+        this.sendAcksAsync = sendAcksAsync;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java
new file mode 100644
index 0000000..2439760
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import java.net.URI;
+
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+
+/**
+ * Providers an interface for client's to listener to events related to
+ * an JmsConnection.
+ */
+public interface JmsConnectionListener {
+
+    /**
+     * Called when an unrecoverable error occurs and the Connection must be closed.
+     *
+     * @param error
+     *        The error that triggered the failure.
+     */
+    void onConnectionFailure(Throwable error);
+
+    /**
+     * Called when the Connection to the remote peer is lost.
+     *
+     * @param remoteURI
+     *        The URI of the Broker previously connected to.
+     */
+    void onConnectionInterrupted(URI remoteURI);
+
+    /**
+     * Called when normal communication has been restored to a remote peer.
+     *
+     * @param remoteURI
+     *        The URI of the Broker that this client is now connected to.
+     */
+    void onConnectionRestored(URI remoteURI);
+
+    /**
+     * Called when a Connection is notified that a new Message has arrived for
+     * one of it's currently active subscriptions.
+     *
+     * @param envelope
+     *        The envelope that contains the incoming message and it's delivery information.
+     */
+    void onMessage(JmsInboundMessageDispatch envelope);
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionMetaData.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionMetaData.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionMetaData.java
new file mode 100644
index 0000000..f674320
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionMetaData.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Enumeration;
+import java.util.Vector;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.jms.ConnectionMetaData;
+
+/**
+ * A <CODE>ConnectionMetaData</CODE> object provides information describing
+ * the <CODE>Connection</CODE> object.
+ */
+public final class JmsConnectionMetaData implements ConnectionMetaData {
+
+    public static final String PROVIDER_VERSION;
+    public static final int PROVIDER_MAJOR_VERSION;
+    public static final int PROVIDER_MINOR_VERSION;
+
+    public static final JmsConnectionMetaData INSTANCE = new JmsConnectionMetaData();
+
+    static {
+        String version = null;
+        int major = 0;
+        int minor = 0;
+        try {
+            Package p = Package.getPackage("org.apache.qpid.jms");
+            if (p != null) {
+                version = p.getImplementationVersion();
+                Pattern pattern = Pattern.compile("(\\d+)\\.(\\d+).*");
+                Matcher m = pattern.matcher(version);
+                if (m.matches()) {
+                    major = Integer.parseInt(m.group(1));
+                    minor = Integer.parseInt(m.group(2));
+                }
+            }
+        } catch (Throwable e) {
+            InputStream in = null;
+            if ((in = JmsConnectionMetaData.class.getResourceAsStream("/org/apache/qpid/jms/version.txt")) != null) {
+                try {
+                    BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+                    version = reader.readLine();
+                    Pattern pattern = Pattern.compile("(\\d+)\\.(\\d+).*");
+                    Matcher m = pattern.matcher(version);
+                    if (m.matches()) {
+                        major = Integer.parseInt(m.group(1));
+                        minor = Integer.parseInt(m.group(2));
+                    }
+                    reader.close();
+                } catch(Throwable err) {
+                }
+            }
+        }
+        PROVIDER_VERSION = version;
+        PROVIDER_MAJOR_VERSION = major;
+        PROVIDER_MINOR_VERSION = minor;
+    }
+
+    private JmsConnectionMetaData() {}
+
+    /**
+     * Gets the JMS API version.
+     *
+     * @return the JMS API version
+     */
+    @Override
+    public String getJMSVersion() {
+        return "1.1";
+    }
+
+    /**
+     * Gets the JMS major version number.
+     *
+     * @return the JMS API major version number
+     */
+    @Override
+    public int getJMSMajorVersion() {
+        return 1;
+    }
+
+    /**
+     * Gets the JMS minor version number.
+     *
+     * @return the JMS API minor version number
+     */
+    @Override
+    public int getJMSMinorVersion() {
+        return 1;
+    }
+
+    /**
+     * Gets the JMS provider name.
+     *
+     * @return the JMS provider name
+     */
+    @Override
+    public String getJMSProviderName() {
+        return "QpidJMS";
+    }
+
+    /**
+     * Gets the JMS provider version.
+     *
+     * @return the JMS provider version
+     */
+    @Override
+    public String getProviderVersion() {
+        return PROVIDER_VERSION;
+    }
+
+    /**
+     * Gets the JMS provider major version number.
+     *
+     * @return the JMS provider major version number
+     */
+    @Override
+    public int getProviderMajorVersion() {
+        return PROVIDER_MAJOR_VERSION;
+    }
+
+    /**
+     * Gets the JMS provider minor version number.
+     *
+     * @return the JMS provider minor version number
+     */
+    @Override
+    public int getProviderMinorVersion() {
+        return PROVIDER_MINOR_VERSION;
+    }
+
+    /**
+     * Gets an enumeration of the JMSX property names.
+     *
+     * @return an Enumeration of JMSX property names
+     */
+    @Override
+    public Enumeration<String> getJMSXPropertyNames() {
+        Vector<String> jmxProperties = new Vector<String>();
+        jmxProperties.add("JMSXUserID");
+        jmxProperties.add("JMSXGroupID");
+        jmxProperties.add("JMSXGroupSeq");
+        jmxProperties.add("JMSXDeliveryCount");
+        return jmxProperties.elements();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsDestination.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsDestination.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsDestination.java
new file mode 100644
index 0000000..a86e0f7
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsDestination.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Map;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.jndi.JNDIStorable;
+import org.apache.qpid.jms.meta.JmsResource;
+import org.apache.qpid.jms.meta.JmsResourceVistor;
+
+/**
+ * Jms Destination
+ */
+public abstract class JmsDestination extends JNDIStorable implements JmsResource, Externalizable, javax.jms.Destination, Comparable<JmsDestination> {
+
+    protected transient String name;
+    protected transient boolean topic;
+    protected transient boolean temporary;
+    protected transient int hashValue;
+    protected transient JmsConnection connection;
+
+    protected JmsDestination(String name, boolean topic, boolean temporary) {
+        this.name = name;
+        this.topic = topic;
+        this.temporary = temporary;
+    }
+
+    public abstract JmsDestination copy();
+
+    @Override
+    public String toString() {
+        return name;
+    }
+
+    /**
+     * @return name of destination
+     */
+    public String getName() {
+        return this.name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    /**
+     * @return the topic
+     */
+    public boolean isTopic() {
+        return this.topic;
+    }
+
+    /**
+     * @return the temporary
+     */
+    public boolean isTemporary() {
+        return this.temporary;
+    }
+
+    /**
+     * @return true if a Topic
+     */
+    public boolean isQueue() {
+        return !this.topic;
+    }
+
+    /**
+     * @param props
+     */
+    @Override
+    protected void buildFromProperties(Map<String, String> props) {
+        setName(getProperty(props, "name", ""));
+        Boolean bool = Boolean.valueOf(getProperty(props, "topic", Boolean.TRUE.toString()));
+        this.topic = bool.booleanValue();
+        bool = Boolean.valueOf(getProperty(props, "temporary", Boolean.FALSE.toString()));
+        this.temporary = bool.booleanValue();
+    }
+
+    /**
+     * @param props
+     */
+    @Override
+    protected void populateProperties(Map<String, String> props) {
+        props.put("name", getName());
+        props.put("topic", Boolean.toString(isTopic()));
+        props.put("temporary", Boolean.toString(isTemporary()));
+    }
+
+    /**
+     * @param other
+     *        the Object to be compared.
+     * @return a negative integer, zero, or a positive integer as this object is
+     *         less than, equal to, or greater than the specified object.
+     * @see java.lang.Comparable#compareTo(java.lang.Object)
+     */
+    @Override
+    public int compareTo(JmsDestination other) {
+        if (other != null) {
+            if (this == other) {
+                return 0;
+            }
+            if (isTemporary() == other.isTemporary()) {
+                return getName().compareTo(other.getName());
+            }
+            return -1;
+        }
+        return -1;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        JmsDestination d = (JmsDestination) o;
+        return getName().equals(d.getName());
+    }
+
+    @Override
+    public int hashCode() {
+        if (hashValue == 0) {
+            hashValue = getName().hashCode();
+        }
+        return hashValue;
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeUTF(getName());
+        out.writeBoolean(isTopic());
+        out.writeBoolean(isTemporary());
+    }
+
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        setName(in.readUTF());
+        this.topic = in.readBoolean();
+        this.temporary = in.readBoolean();
+    }
+
+    void setConnection(JmsConnection connection) {
+        this.connection = connection;
+    }
+
+    JmsConnection getConnection() {
+        return this.connection;
+    }
+
+    /**
+     * Attempts to delete the destination if there is an assigned Connection object.
+     *
+     * @throws JMSException if an error occurs or the provider doesn't support
+     *         delete of destinations from the client.
+     */
+    protected void tryDelete() throws JMSException {
+        if (connection != null) {
+            connection.deleteDestination(this);
+        }
+    }
+
+    @Override
+    public void visit(JmsResourceVistor visitor) throws Exception {
+        visitor.processDestination(this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsDurableTopicSubscriber.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsDurableTopicSubscriber.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsDurableTopicSubscriber.java
new file mode 100644
index 0000000..b7ae1b9
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsDurableTopicSubscriber.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.meta.JmsConsumerId;
+
+/**
+ * Implementation of a TopicSubscriber that is Durable
+ */
+public class JmsDurableTopicSubscriber extends JmsTopicSubscriber {
+
+    /**
+     * Creates a durable TopicSubscriber
+     *
+     * @param id
+     * @param s
+     * @param destination
+     * @param name
+     * @param noLocal
+     * @param selector
+     * @throws JMSException
+     */
+    public JmsDurableTopicSubscriber(JmsConsumerId id, JmsSession s, JmsDestination destination, String name, boolean noLocal, String selector) throws JMSException {
+        super(id, s, destination, name, noLocal, selector);
+    }
+
+    @Override
+    public boolean isDurableSubscription() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
new file mode 100644
index 0000000..c9395ba
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
+import org.apache.qpid.jms.meta.JmsTransactionId;
+import org.apache.qpid.jms.meta.JmsTransactionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages the details of a Session operating inside of a local JMS transaction.
+ */
+public class JmsLocalTransactionContext {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JmsLocalTransactionContext.class);
+
+    private List<JmsTxSynchronization> synchronizations;
+    private final JmsSession session;
+    private final JmsConnection connection;
+    private JmsTransactionId transactionId;
+    private JmsTransactionListener listener;
+
+    public JmsLocalTransactionContext(JmsSession session) {
+        this.session = session;
+        this.connection = session.getConnection();
+    }
+
+    /**
+     * Adds the given Transaction synchronization to the current list.
+     *
+     * @param synchronization
+     *        the transaction synchronization to add.
+     */
+    public void addSynchronization(JmsTxSynchronization s) {
+        if (synchronizations == null) {
+            synchronizations = new ArrayList<JmsTxSynchronization>(10);
+        }
+        synchronizations.add(s);
+    }
+
+    /**
+     * Clears the current Transacted state.  This is usually done when the client
+     * detects that a failover has occurred and needs to create a new Transaction
+     * for a Session that was previously enlisted in a transaction.
+     */
+    public void clear() {
+        this.transactionId = null;
+        this.synchronizations = null;
+    }
+
+    /**
+     * Start a local transaction.
+     *
+     * @throws javax.jms.JMSException on internal error
+     */
+    public void begin() throws JMSException {
+        if (transactionId == null) {
+            synchronizations = null;
+
+            transactionId = connection.getNextTransactionId();
+            JmsTransactionInfo transaction = new JmsTransactionInfo(session.getSessionId(), transactionId);
+            connection.createResource(transaction);
+
+            if (listener != null) {
+                listener.onTransactionStarted();
+            }
+
+            LOG.debug("Begin: {}", transactionId);
+        }
+    }
+
+    /**
+     * Rolls back any work done in this transaction and releases any locks
+     * currently held.
+     *
+     * @throws JMSException
+     *         if the JMS provider fails to roll back the transaction due to some internal error.
+     */
+    public void rollback() throws JMSException {
+        if (transactionId != null) {
+            LOG.debug("Rollback: {} syncCount: {}", transactionId,
+                      (synchronizations != null ? synchronizations.size() : 0));
+
+            transactionId = null;
+            connection.rollback(session.getSessionId());
+
+            if (listener != null) {
+                listener.onTransactionRolledBack();
+            }
+        }
+
+        afterRollback();
+    }
+
+    /**
+     * Commits all work done in this transaction and releases any locks
+     * currently held.
+     *
+     * @throws JMSException
+     *         if the JMS provider fails to roll back the transaction due to some internal error.
+     */
+    public void commit() throws JMSException {
+        if (transactionId != null) {
+            LOG.debug("Commit: {} syncCount: {}", transactionId,
+                      (synchronizations != null ? synchronizations.size() : 0));
+
+            JmsTransactionId oldTransactionId = this.transactionId;
+            transactionId = null;
+            try {
+                connection.commit(session.getSessionId());
+                if (listener != null) {
+                    listener.onTransactionCommitted();
+                }
+                afterCommit();
+            } catch (JMSException cause) {
+                LOG.info("Commit failed for transaction: {}", oldTransactionId);
+                if (listener != null) {
+                    listener.onTransactionRolledBack();
+                }
+                afterRollback();
+                throw cause;
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "JmsLocalTransactionContext{transactionId=" + transactionId + "}";
+    }
+
+    //------------- Getters and Setters --------------------------------------//
+
+    public JmsTransactionId getTransactionId() {
+        return this.transactionId;
+    }
+
+    public JmsTransactionListener getListener() {
+        return listener;
+    }
+
+    public void setListener(JmsTransactionListener listener) {
+        this.listener = listener;
+    }
+
+    public boolean isInTransaction() {
+        return this.transactionId != null;
+    }
+
+    //------------- Implementation methods -----------------------------------//
+
+    private void afterRollback() throws JMSException {
+        if (synchronizations == null) {
+            return;
+        }
+
+        Throwable firstException = null;
+        int size = synchronizations.size();
+        for (int i = 0; i < size; i++) {
+            try {
+                synchronizations.get(i).afterRollback();
+            } catch (Throwable thrown) {
+                LOG.debug("Exception from afterRollback on " + synchronizations.get(i), thrown);
+                if (firstException == null) {
+                    firstException = thrown;
+                }
+            }
+        }
+        synchronizations = null;
+        if (firstException != null) {
+            throw JmsExceptionSupport.create(firstException);
+        }
+    }
+
+    private void afterCommit() throws JMSException {
+        if (synchronizations == null) {
+            return;
+        }
+
+        Throwable firstException = null;
+        int size = synchronizations.size();
+        for (int i = 0; i < size; i++) {
+            try {
+                synchronizations.get(i).afterCommit();
+            } catch (Throwable thrown) {
+                LOG.debug("Exception from afterCommit on " + synchronizations.get(i), thrown);
+                if (firstException == null) {
+                    firstException = thrown;
+                }
+            }
+        }
+        synchronizations = null;
+        if (firstException != null) {
+            throw JmsExceptionSupport.create(firstException);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageAvailableConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageAvailableConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageAvailableConsumer.java
new file mode 100644
index 0000000..31f53c6
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageAvailableConsumer.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.MessageConsumer;
+
+/**
+ * Marker interface used for MessageConsumer instances that support sending
+ * a notification event when a message has arrived when the consumer is not
+ * in asynchronous dispatch mode.
+ */
+public interface JmsMessageAvailableConsumer {
+
+    /**
+     * Sets the listener used to notify synchronous consumers that there is a message
+     * available so that the {@link MessageConsumer#receiveNoWait()} can be called.
+     *
+     * @param availableListener
+     *        the JmsMessageAvailableListener instance to signal.
+     */
+    void setAvailableListener(JmsMessageAvailableListener availableListener);
+
+    /**
+     * Gets the listener used to notify synchronous consumers that there is a message
+     * available so that the {@link MessageConsumer#receiveNoWait()} can be called.
+     *
+     * @return the currently configured message available listener instance.
+     */
+    JmsMessageAvailableListener getAvailableListener();
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org