You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2014/09/23 20:20:48 UTC

[24/27] Initial drop of donated AMQP Client Code.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
new file mode 100644
index 0000000..a7ede86
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -0,0 +1,1002 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+
+import org.apache.activemq.apollo.filter.FilterException;
+import org.apache.activemq.apollo.selector.SelectorParser;
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.apache.qpid.jms.message.JmsMessage;
+import org.apache.qpid.jms.message.JmsMessageFactory;
+import org.apache.qpid.jms.message.JmsMessageTransformation;
+import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
+import org.apache.qpid.jms.meta.JmsConsumerId;
+import org.apache.qpid.jms.meta.JmsMessageId;
+import org.apache.qpid.jms.meta.JmsProducerId;
+import org.apache.qpid.jms.meta.JmsSessionId;
+import org.apache.qpid.jms.meta.JmsSessionInfo;
+import org.apache.qpid.jms.provider.Provider;
+import org.apache.qpid.jms.provider.ProviderFuture;
+import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
+
+/**
+ * JMS Session implementation
+ */
+@SuppressWarnings("static-access")
+public class JmsSession implements Session, QueueSession, TopicSession, JmsMessageDispatcher {
+
+    private final JmsConnection connection;
+    private final int acknowledgementMode;
+    private final List<JmsMessageProducer> producers = new CopyOnWriteArrayList<JmsMessageProducer>();
+    private final Map<JmsConsumerId, JmsMessageConsumer> consumers = new ConcurrentHashMap<JmsConsumerId, JmsMessageConsumer>();
+    private MessageListener messageListener;
+    private final AtomicBoolean closed = new AtomicBoolean();
+    private final AtomicBoolean started = new AtomicBoolean();
+    private final LinkedBlockingQueue<JmsInboundMessageDispatch> stoppedMessages =
+        new LinkedBlockingQueue<JmsInboundMessageDispatch>(10000);
+    private JmsPrefetchPolicy prefetchPolicy;
+    private JmsSessionInfo sessionInfo;
+    private ExecutorService executor;
+    private final ReentrantLock sendLock = new ReentrantLock();
+
+    private final AtomicLong consumerIdGenerator = new AtomicLong();
+    private final AtomicLong producerIdGenerator = new AtomicLong();
+    private JmsLocalTransactionContext transactionContext;
+    private JmsMessageFactory messageFactory;
+
+    protected JmsSession(JmsConnection connection, JmsSessionId sessionId, int acknowledgementMode) throws JMSException {
+        this.connection = connection;
+        this.acknowledgementMode = acknowledgementMode;
+        this.prefetchPolicy = new JmsPrefetchPolicy(connection.getPrefetchPolicy());
+
+        setTransactionContext(new JmsLocalTransactionContext(this));
+
+        this.sessionInfo = new JmsSessionInfo(sessionId);
+        this.sessionInfo.setAcknowledgementMode(acknowledgementMode);
+        this.sessionInfo.setSendAcksAsync(connection.isSendAcksAsync());
+
+        this.sessionInfo = connection.createResource(sessionInfo);
+        this.messageFactory = connection.getMessageFactory();
+    }
+
+    int acknowledgementMode() {
+        return this.acknowledgementMode;
+    }
+
+    //////////////////////////////////////////////////////////////////////////
+    // Session methods
+    //////////////////////////////////////////////////////////////////////////
+
+    @Override
+    public int getAcknowledgeMode() throws JMSException {
+        checkClosed();
+        return this.acknowledgementMode;
+    }
+
+    @Override
+    public boolean getTransacted() throws JMSException {
+        checkClosed();
+        return isTransacted();
+    }
+
+    @Override
+    public MessageListener getMessageListener() throws JMSException {
+        checkClosed();
+        return this.messageListener;
+    }
+
+    @Override
+    public void setMessageListener(MessageListener listener) throws JMSException {
+        checkClosed();
+        this.messageListener = listener;
+    }
+
+    @Override
+    public void recover() throws JMSException {
+        checkClosed();
+        if (getTransacted()) {
+            throw new javax.jms.IllegalStateException("Cannot call recover() on a transacted session");
+        }
+
+        this.connection.recover(getSessionId());
+    }
+
+    @Override
+    public void commit() throws JMSException {
+        checkClosed();
+
+        if (!getTransacted()) {
+           throw new javax.jms.IllegalStateException("Not a transacted session");
+        }
+
+        this.transactionContext.commit();
+    }
+
+    @Override
+    public void rollback() throws JMSException {
+        checkClosed();
+        if (!getTransacted()) {
+            throw new javax.jms.IllegalStateException("Not a transacted session");
+        }
+
+        this.transactionContext.rollback();
+
+        getExecutor().execute(new Runnable() {
+            @Override
+            public void run() {
+                for (JmsMessageConsumer c : consumers.values()) {
+                    c.drainMessageQueueToListener();
+                }
+            }
+        });
+    }
+
+    @Override
+    public void run() {
+        try {
+            checkClosed();
+        } catch (IllegalStateException e) {
+            throw new RuntimeException(e);
+        }
+
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void close() throws JMSException {
+        if (!closed.get()) {
+            doClose();
+        }
+    }
+
+    /**
+     * Shutdown the Session and release all resources.  Once completed the Session can
+     * request that the Provider destroy the Session and it's child resources.
+     *
+     * @throws JMSException
+     */
+    protected void doClose() throws JMSException {
+        boolean interrupted = Thread.interrupted();
+        shutdown();
+        this.connection.removeSession(this);
+        this.connection.destroyResource(sessionInfo);
+        if (interrupted) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    /**
+     * This method should terminate all Session resources and prepare for disposal of the
+     * Session.  It is called either from the Session close method or from the Connection
+     * when a close request is made and the Connection wants to cleanup all Session resources.
+     *
+     * This method should not attempt to send a destroy request to the Provider as that
+     * will either be done by another session method or is not needed when done by the parent
+     * Connection.
+     *
+     * @throws JMSException
+     */
+    protected void shutdown() throws JMSException {
+        if (closed.compareAndSet(false, true)) {
+            stop();
+            for (JmsMessageConsumer consumer : new ArrayList<JmsMessageConsumer>(this.consumers.values())) {
+                consumer.shutdown();
+            }
+
+            for (JmsMessageProducer producer : this.producers) {
+                producer.shutdown();
+            }
+
+            try {
+                if (getTransactionContext().isInTransaction()) {
+                    rollback();
+                }
+            } catch (JMSException e) {
+            }
+        }
+    }
+
+    //////////////////////////////////////////////////////////////////////////
+    // Consumer creation
+    //////////////////////////////////////////////////////////////////////////
+
+    /**
+     * @param destination
+     * @return a MessageConsumer
+     * @throws JMSException
+     * @see javax.jms.Session#createConsumer(javax.jms.Destination)
+     */
+    @Override
+    public MessageConsumer createConsumer(Destination destination) throws JMSException {
+        return createConsumer(destination, null);
+    }
+
+    /**
+     * @param destination
+     * @param messageSelector
+     * @return MessageConsumer
+     * @throws JMSException
+     * @see javax.jms.Session#createConsumer(javax.jms.Destination,
+     *      java.lang.String)
+     */
+    @Override
+    public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
+        return createConsumer(destination, messageSelector, false);
+    }
+
+    /**
+     * @param destination
+     * @param messageSelector
+     * @param NoLocal
+     * @return the MessageConsumer
+     * @throws JMSException
+     * @see javax.jms.Session#createConsumer(javax.jms.Destination,
+     *      java.lang.String, boolean)
+     */
+    @Override
+    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal) throws JMSException {
+        checkClosed();
+        checkDestination(destination);
+        messageSelector = checkSelector(messageSelector);
+        JmsDestination dest = JmsMessageTransformation.transformDestination(connection, destination);
+        JmsTopicSubscriber result = new JmsTopicSubscriber(getNextConsumerId(), this, dest, NoLocal, messageSelector);
+        result.init();
+        return result;
+    }
+
+    /**
+     * @param queue
+     * @return QueueRecevier
+     * @throws JMSException
+     * @see javax.jms.QueueSession#createReceiver(javax.jms.Queue)
+     */
+    @Override
+    public QueueReceiver createReceiver(Queue queue) throws JMSException {
+        checkClosed();
+        checkDestination(queue);
+        JmsDestination dest = JmsMessageTransformation.transformDestination(connection, queue);
+        JmsQueueReceiver result = new JmsQueueReceiver(getNextConsumerId(), this, dest, "");
+        result.init();
+        return result;
+    }
+
+    /**
+     * @param queue
+     * @param messageSelector
+     * @return QueueReceiver
+     * @throws JMSException
+     * @see javax.jms.QueueSession#createReceiver(javax.jms.Queue,
+     *      java.lang.String)
+     */
+    @Override
+    public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
+        checkClosed();
+        checkDestination(queue);
+        messageSelector = checkSelector(messageSelector);
+        JmsDestination dest = JmsMessageTransformation.transformDestination(connection, queue);
+        JmsQueueReceiver result = new JmsQueueReceiver(getNextConsumerId(), this, dest, messageSelector);
+        result.init();
+        return result;
+    }
+
+    /**
+     * @param destination
+     * @return QueueBrowser
+     * @throws JMSException
+     * @see javax.jms.Session#createBrowser(javax.jms.Queue)
+     */
+    @Override
+    public QueueBrowser createBrowser(Queue destination) throws JMSException {
+        return createBrowser(destination, null);
+    }
+
+    /**
+     * @param destination
+     * @param messageSelector
+     * @return QueueBrowser
+     * @throws JMSException
+     * @see javax.jms.Session#createBrowser(javax.jms.Queue, java.lang.String)
+     */
+    @Override
+    public QueueBrowser createBrowser(Queue destination, String messageSelector) throws JMSException {
+        checkClosed();
+        checkDestination(destination);
+        messageSelector = checkSelector(messageSelector);
+        JmsDestination dest = JmsMessageTransformation.transformDestination(connection, destination);
+        JmsQueueBrowser result = new JmsQueueBrowser(this, dest, messageSelector);
+        return result;
+    }
+
+    /**
+     * @param topic
+     * @return TopicSubscriber
+     * @throws JMSException
+     * @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic)
+     */
+    @Override
+    public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
+        return createSubscriber(topic, null, false);
+    }
+
+    /**
+     * @param topic
+     * @param messageSelector
+     * @param noLocal
+     * @return TopicSubscriber
+     * @throws JMSException
+     * @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic,
+     *      java.lang.String, boolean)
+     */
+    @Override
+    public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
+        checkClosed();
+        checkDestination(topic);
+        messageSelector = checkSelector(messageSelector);
+        JmsDestination dest = JmsMessageTransformation.transformDestination(connection, topic);
+        JmsTopicSubscriber result = new JmsTopicSubscriber(getNextConsumerId(), this, dest, noLocal, messageSelector);
+        result.init();
+        return result;
+    }
+
+    /**
+     * @param topic
+     * @param name
+     * @return a TopicSubscriber
+     * @throws JMSException
+     * @see javax.jms.Session#createDurableSubscriber(javax.jms.Topic,
+     *      java.lang.String)
+     */
+    @Override
+    public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
+        return createDurableSubscriber(topic, name, null, false);
+    }
+
+    /**
+     * @param topic
+     * @param name
+     * @param messageSelector
+     * @param noLocal
+     * @return TopicSubscriber
+     * @throws JMSException
+     * @see javax.jms.Session#createDurableSubscriber(javax.jms.Topic,
+     *      java.lang.String, java.lang.String, boolean)
+     */
+    @Override
+    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
+        checkClosed();
+        checkDestination(topic);
+        messageSelector = checkSelector(messageSelector);
+        JmsDestination dest = JmsMessageTransformation.transformDestination(connection, topic);
+        JmsTopicSubscriber result = new JmsDurableTopicSubscriber(getNextConsumerId(), this, dest, name, false, messageSelector);
+        result.init();
+        return result;
+    }
+
+    /**
+     * @param name
+     * @throws JMSException
+     * @see javax.jms.Session#unsubscribe(java.lang.String)
+     */
+    @Override
+    public void unsubscribe(String name) throws JMSException {
+        checkClosed();
+        this.connection.unsubscribe(name);
+    }
+
+    //////////////////////////////////////////////////////////////////////////
+    // Producer creation
+    //////////////////////////////////////////////////////////////////////////
+
+    /**
+     * @param destination
+     * @return MessageProducer
+     * @throws JMSException
+     * @see javax.jms.Session#createProducer(javax.jms.Destination)
+     */
+    @Override
+    public MessageProducer createProducer(Destination destination) throws JMSException {
+        checkClosed();
+        JmsDestination dest = JmsMessageTransformation.transformDestination(connection, destination);
+        JmsMessageProducer result = new JmsMessageProducer(getNextProducerId(), this, dest);
+        add(result);
+        return result;
+    }
+
+    /**
+     * @param queue
+     * @return QueueSender
+     * @throws JMSException
+     * @see javax.jms.QueueSession#createSender(javax.jms.Queue)
+     */
+    @Override
+    public QueueSender createSender(Queue queue) throws JMSException {
+        checkClosed();
+        JmsDestination dest = JmsMessageTransformation.transformDestination(connection, queue);
+        JmsQueueSender result = new JmsQueueSender(getNextProducerId(), this, dest);
+        return result;
+    }
+
+    /**
+     * @param topic
+     * @return TopicPublisher
+     * @throws JMSException
+     * @see javax.jms.TopicSession#createPublisher(javax.jms.Topic)
+     */
+    @Override
+    public TopicPublisher createPublisher(Topic topic) throws JMSException {
+        checkClosed();
+        JmsDestination dest = JmsMessageTransformation.transformDestination(connection, topic);
+        JmsTopicPublisher result = new JmsTopicPublisher(getNextProducerId(), this, dest);
+        add(result);
+        return result;
+    }
+
+    //////////////////////////////////////////////////////////////////////////
+    // Message creation
+    //////////////////////////////////////////////////////////////////////////
+
+    @Override
+    public BytesMessage createBytesMessage() throws JMSException {
+        checkClosed();
+        return init(messageFactory.createBytesMessage());
+    }
+
+    @Override
+    public MapMessage createMapMessage() throws JMSException {
+        checkClosed();
+        return init(messageFactory.createMapMessage());
+    }
+
+    @Override
+    public Message createMessage() throws JMSException {
+        checkClosed();
+        return init(messageFactory.createMessage());
+    }
+
+    @Override
+    public ObjectMessage createObjectMessage() throws JMSException {
+        checkClosed();
+        return init(messageFactory.createObjectMessage(null));
+    }
+
+    @Override
+    public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
+        checkClosed();
+        return init(messageFactory.createObjectMessage(object));
+    }
+
+    @Override
+    public StreamMessage createStreamMessage() throws JMSException {
+        checkClosed();
+        return init(messageFactory.createStreamMessage());
+    }
+
+    @Override
+    public TextMessage createTextMessage() throws JMSException {
+        checkClosed();
+        return init(messageFactory.createTextMessage(null));
+    }
+
+    @Override
+    public TextMessage createTextMessage(String text) throws JMSException {
+        checkClosed();
+        return init(messageFactory.createTextMessage(text));
+    }
+
+    //////////////////////////////////////////////////////////////////////////
+    // Destination creation
+    //////////////////////////////////////////////////////////////////////////
+
+    /**
+     * @param queueName
+     * @return Queue
+     * @throws JMSException
+     * @see javax.jms.Session#createQueue(java.lang.String)
+     */
+    @Override
+    public Queue createQueue(String queueName) throws JMSException {
+        checkClosed();
+        return new JmsQueue(queueName);
+    }
+
+    /**
+     * @param topicName
+     * @return Topic
+     * @throws JMSException
+     * @see javax.jms.Session#createTopic(java.lang.String)
+     */
+    @Override
+    public Topic createTopic(String topicName) throws JMSException {
+        checkClosed();
+        return new JmsTopic(topicName);
+    }
+
+    /**
+     * @return TemporaryQueue
+     * @throws JMSException
+     * @see javax.jms.Session#createTemporaryQueue()
+     */
+    @Override
+    public TemporaryQueue createTemporaryQueue() throws JMSException {
+        checkClosed();
+        return connection.createTemporaryQueue();
+    }
+
+    /**
+     * @return TemporaryTopic
+     * @throws JMSException
+     * @see javax.jms.Session#createTemporaryTopic()
+     */
+    @Override
+    public TemporaryTopic createTemporaryTopic() throws JMSException {
+        checkClosed();
+        return connection.createTemporaryTopic();
+    }
+
+    //////////////////////////////////////////////////////////////////////////
+    // Session Implementation methods
+    //////////////////////////////////////////////////////////////////////////
+
+    protected void add(JmsMessageConsumer consumer) throws JMSException {
+        this.consumers.put(consumer.getConsumerId(), consumer);
+        this.connection.addDispatcher(consumer.getConsumerId(), this);
+
+        if (started.get()) {
+            consumer.start();
+        }
+    }
+
+    protected void remove(JmsMessageConsumer consumer) throws JMSException {
+        this.connection.removeDispatcher(consumer.getConsumerId());
+        this.consumers.remove(consumer.getConsumerId());
+    }
+
+    protected void add(JmsMessageProducer producer) {
+        this.producers.add(producer);
+    }
+
+    protected void remove(MessageProducer producer) {
+        this.producers.remove(producer);
+    }
+
+    protected void onException(Exception ex) {
+        this.connection.onException(ex);
+    }
+
+    protected void onException(JMSException ex) {
+        this.connection.onException(ex);
+    }
+
+    protected void send(JmsMessageProducer producer, Destination dest, Message msg, int deliveryMode, int priority, long timeToLive, boolean disableMsgId, boolean disableTimestamp) throws JMSException {
+        JmsDestination destination = JmsMessageTransformation.transformDestination(connection, dest);
+        send(producer, destination, msg, deliveryMode, priority, timeToLive, disableMsgId, disableTimestamp);
+    }
+
+    private void send(JmsMessageProducer producer, JmsDestination destination, Message original, int deliveryMode, int priority, long timeToLive, boolean disableMsgId, boolean disableTimestamp) throws JMSException {
+        sendLock.lock();
+        try {
+            startNextTransaction();
+
+            original.setJMSDeliveryMode(deliveryMode);
+            original.setJMSPriority(priority);
+            original.setJMSRedelivered(false);
+
+            long timeStamp = 0;
+            boolean hasTTL = timeToLive > 0;
+            if (!disableTimestamp || hasTTL) {
+                timeStamp = System.currentTimeMillis();
+            }
+
+            original.setJMSTimestamp(timeStamp);
+
+            if (hasTTL) {
+                original.setJMSExpiration(timeStamp + timeToLive);
+            }
+
+            JmsMessageId msgId = null;
+            if (!disableMsgId) {
+                msgId = getNextMessageId(producer);
+            }
+
+            boolean isJmsMessageType = original instanceof JmsMessage;
+            if (isJmsMessageType) {
+                ((JmsMessage) original).setConnection(connection);
+                if (!disableMsgId) {
+                    ((JmsMessage) original).setJMSMessageID(msgId);
+                }
+                original.setJMSDestination(destination);
+            }
+
+            JmsMessage copy = JmsMessageTransformation.transformMessage(connection, original);
+
+            // Ensure original message gets the destination and message ID as per spec.
+            if (!isJmsMessageType) {
+                if (!disableMsgId) {
+                    original.setJMSMessageID(msgId.toString());
+                    copy.setJMSMessageID(msgId);
+                }
+                original.setJMSDestination(destination);
+                copy.setJMSDestination(destination);
+            }
+
+            boolean sync = connection.isAlwaysSyncSend() ||
+                           (!connection.isForceAsyncSend() && deliveryMode == DeliveryMode.PERSISTENT && !getTransacted());
+
+            copy.onSend();
+            JmsOutboundMessageDispatch envelope = new JmsOutboundMessageDispatch();
+            envelope.setMessage(copy);
+            envelope.setProducerId(producer.getProducerId());
+            envelope.setDestination(destination);
+            envelope.setSendAsync(!sync);
+
+            this.connection.send(envelope);
+        } finally {
+            sendLock.unlock();
+        }
+    }
+
+    void acknowledge(JmsInboundMessageDispatch envelope, ACK_TYPE ackType) throws JMSException {
+        startNextTransaction();
+        this.connection.acknowledge(envelope, ackType);
+    }
+
+    /**
+     * Acknowledge all previously delivered messages in this Session as consumed.  This
+     * method is usually only called when the Session is in the CLIENT_ACKNOWLEDGE mode.
+     *
+     * @throws JMSException if an error occurs while the acknowledge is processed.
+     */
+    void acknowledge() throws JMSException {
+        this.connection.acknowledge(sessionInfo.getSessionId());
+    }
+
+    public boolean isClosed() {
+        return this.closed.get();
+    }
+
+    /**
+     * Checks whether the session uses transactions.
+     *
+     * @return true - if the session uses transactions.
+     */
+    public boolean isTransacted() {
+        return this.acknowledgementMode == Session.SESSION_TRANSACTED;
+    }
+
+    /**
+     * Checks whether the session used client acknowledgment.
+     *
+     * @return true - if the session uses client acknowledgment.
+     */
+    protected boolean isClientAcknowledge() {
+        return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE;
+    }
+
+    /**
+     * Checks whether the session used auto acknowledgment.
+     *
+     * @return true - if the session uses client acknowledgment.
+     */
+    public boolean isAutoAcknowledge() {
+        return acknowledgementMode == Session.AUTO_ACKNOWLEDGE;
+    }
+
+    /**
+     * Checks whether the session used dup ok acknowledgment.
+     *
+     * @return true - if the session uses client acknowledgment.
+     */
+    public boolean isDupsOkAcknowledge() {
+        return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
+    }
+
+    protected void checkClosed() throws IllegalStateException {
+        if (this.closed.get()) {
+            throw new IllegalStateException("The MessageProducer is closed");
+        }
+    }
+
+    // This extra wrapping class around SelectorParser is used to avoid
+    // ClassNotFoundException if SelectorParser is not in the class path.
+    static class OptionalSectorParser {
+        public static void check(String selector) throws InvalidSelectorException {
+            try {
+                SelectorParser.parse(selector);
+            } catch (FilterException e) {
+                throw new InvalidSelectorException(e.getMessage());
+            }
+        }
+    }
+
+    static final OptionalSectorParser SELECTOR_PARSER;
+    static {
+        OptionalSectorParser parser;
+        try {
+            // lets verify it's working..
+            parser = new OptionalSectorParser();
+            parser.check("x=1");
+        } catch (Throwable e) {
+            parser = null;
+        }
+        SELECTOR_PARSER = parser;
+    }
+
+    public static String checkSelector(String selector) throws InvalidSelectorException {
+        if (selector != null) {
+            if (selector.trim().length() == 0) {
+                return null;
+            }
+            if (SELECTOR_PARSER != null) {
+                SELECTOR_PARSER.check(selector);
+            }
+        }
+        return selector;
+    }
+
+    public static void checkDestination(Destination dest) throws InvalidDestinationException {
+        if (dest == null) {
+            throw new InvalidDestinationException("Destination cannot be null");
+        }
+    }
+
+    protected void start() throws JMSException {
+        if (started.compareAndSet(false, true)) {
+            JmsInboundMessageDispatch message = null;
+            while ((message = this.stoppedMessages.poll()) != null) {
+                deliver(message);
+            }
+            for (JmsMessageConsumer consumer : consumers.values()) {
+                consumer.start();
+            }
+        }
+    }
+
+    protected void stop() throws JMSException {
+        started.set(false);
+        if (executor != null) {
+            executor.shutdown();
+            executor = null;
+        }
+        for (JmsMessageConsumer consumer : consumers.values()) {
+            consumer.stop();
+        }
+    }
+
+    protected boolean isStarted() {
+        return this.started.get();
+    }
+
+    public JmsConnection getConnection() {
+        return this.connection;
+    }
+
+    Executor getExecutor() {
+        if (executor == null) {
+            executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
+
+                @Override
+                public Thread newThread(Runnable runner) {
+                    Thread executor = new Thread(runner);
+                    executor.setName("JmsSession ["+ sessionInfo.getSessionId() + "] dispatcher");
+                    executor.setDaemon(true);
+                    return executor;
+                }
+            });
+        }
+        return executor;
+    }
+
+    protected JmsSessionInfo getSessionInfo() {
+        return this.sessionInfo;
+    }
+
+    protected JmsSessionId getSessionId() {
+        return this.sessionInfo.getSessionId();
+    }
+
+    protected JmsConsumerId getNextConsumerId() {
+        return new JmsConsumerId(sessionInfo.getSessionId(), consumerIdGenerator.incrementAndGet());
+    }
+
+    protected JmsProducerId getNextProducerId() {
+        return new JmsProducerId(sessionInfo.getSessionId(), producerIdGenerator.incrementAndGet());
+    }
+
+    private JmsMessageId getNextMessageId(JmsMessageProducer producer) {
+        return new JmsMessageId(producer.getProducerId(), producer.getNextMessageSequence());
+    }
+
+    private <T extends JmsMessage> T init(T message) {
+        message.setConnection(connection);
+        return message;
+    }
+
+    private synchronized void startNextTransaction() throws JMSException {
+        if (getTransacted()) {
+            transactionContext.begin();
+        }
+    }
+
+    boolean isDestinationInUse(JmsDestination destination) {
+        for (JmsMessageConsumer consumer : consumers.values()) {
+            if (consumer.isUsingDestination(destination)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    void checkMessageListener() throws JMSException {
+        if (messageListener != null) {
+            throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
+        }
+        for (JmsMessageConsumer consumer : consumers.values()) {
+            if (consumer.hasMessageListener()) {
+                throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
+            }
+        }
+    }
+
+    public JmsPrefetchPolicy getPrefetchPolicy() {
+        return prefetchPolicy;
+    }
+
+    public void setPrefetchPolicy(JmsPrefetchPolicy prefetchPolicy) {
+        this.prefetchPolicy = prefetchPolicy;
+    }
+
+    @Override
+    public void onMessage(JmsInboundMessageDispatch envelope) {
+        if (started.get()) {
+            deliver(envelope);
+        } else {
+            this.stoppedMessages.add(envelope);
+        }
+    }
+
+    protected void onConnectionInterrupted() {
+        for (JmsMessageProducer producer : producers) {
+            producer.onConnectionInterrupted();
+        }
+
+        for (JmsMessageConsumer consumer : consumers.values()) {
+            consumer.onConnectionInterrupted();
+        }
+    }
+
+    protected void onConnectionRecovery(Provider provider) throws Exception {
+
+        ProviderFuture request = new ProviderFuture();
+        provider.create(sessionInfo, request);
+        request.sync();
+
+        if (this.acknowledgementMode == SESSION_TRANSACTED) {
+            if (transactionContext.isInTransaction()) {
+                transactionContext.clear();
+                transactionContext.begin();
+            }
+        }
+
+        for (JmsMessageProducer producer : producers) {
+            producer.onConnectionRecovery(provider);
+        }
+
+        for (JmsMessageConsumer consumer : consumers.values()) {
+            consumer.onConnectionRecovery(provider);
+        }
+    }
+
+    protected void onConnectionRecovered(Provider provider) throws Exception {
+
+        this.messageFactory = provider.getMessageFactory();
+
+        for (JmsMessageProducer producer : producers) {
+            producer.onConnectionRecovered(provider);
+        }
+
+        for (JmsMessageConsumer consumer : consumers.values()) {
+            consumer.onConnectionRecovered(provider);
+        }
+    }
+
+    protected void onConnectionRestored() {
+        for (JmsMessageProducer producer : producers) {
+            producer.onConnectionRestored();
+        }
+
+        for (JmsMessageConsumer consumer : consumers.values()) {
+            consumer.onConnectionRestored();
+        }
+    }
+
+    private void deliver(JmsInboundMessageDispatch envelope) {
+        JmsConsumerId id = envelope.getConsumerId();
+        if (id == null) {
+            this.connection.onException(new JMSException("No ConsumerId set for " + envelope.getMessage()));
+        }
+        if (this.messageListener != null) {
+            this.messageListener.onMessage(envelope.getMessage());
+        } else {
+            JmsMessageConsumer consumer = this.consumers.get(id);
+            if (consumer != null) {
+                consumer.onMessage(envelope);
+            }
+        }
+    }
+
+    /**
+     * Sets the transaction context of the session.
+     *
+     * @param transactionContext
+     *        provides the means to control a JMS transaction.
+     */
+    public void setTransactionContext(JmsLocalTransactionContext transactionContext) {
+        this.transactionContext = transactionContext;
+    }
+
+    /**
+     * Returns the transaction context of the session.
+     *
+     * @return transactionContext
+     *         session's transaction context.
+     */
+    public JmsLocalTransactionContext getTransactionContext() {
+        return transactionContext;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSslConnectionFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSslConnectionFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSslConnectionFactory.java
new file mode 100644
index 0000000..cefe491
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSslConnectionFactory.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import java.net.URI;
+
+import org.apache.qpid.jms.provider.Provider;
+
+/**
+ * SSL Aware Factory class that allows for configuration of the SSL values used
+ * in the Provider transports that are SSL aware.
+ */
+public class JmsSslConnectionFactory extends JmsConnectionFactory {
+
+    private final JmsSslContext configured = JmsSslContext.getCurrentSslContext();
+
+    public JmsSslConnectionFactory() {
+    }
+
+    public JmsSslConnectionFactory(String username, String password) {
+        super(username, password);
+    }
+
+    public JmsSslConnectionFactory(String brokerURI) {
+        super(brokerURI);
+    }
+
+    public JmsSslConnectionFactory(URI brokerURI) {
+        super(brokerURI);
+    }
+
+    public JmsSslConnectionFactory(String username, String password, URI brokerURI) {
+        super(username, password, brokerURI);
+    }
+
+    public JmsSslConnectionFactory(String username, String password, String brokerURI) {
+        super(username, password, brokerURI);
+    }
+
+    @Override
+    protected Provider createProvider(URI brokerURI) throws Exception {
+        // Create and set a new instance as the current JmsSslContext for this thread
+        // based on current configuration settings.
+        JmsSslContext.setCurrentSslContext(configured.copy());
+        return super.createProvider(brokerURI);
+    }
+
+    public String getKeyStoreLocation() {
+        return configured.getKeyStoreLocation();
+    }
+
+    public void setKeyStoreLocation(String keyStoreLocation) {
+        this.configured.setKeyStoreLocation(keyStoreLocation);
+    }
+
+    public String getKeyStorePassword() {
+        return configured.getKeyStorePassword();
+    }
+
+    public void setKeyStorePassword(String keyStorePassword) {
+        this.configured.setKeyStorePassword(keyStorePassword);
+    }
+
+    public String getTrustStoreLocation() {
+        return configured.getTrustStoreLocation();
+    }
+
+    public void setTrustStoreLocation(String trustStoreLocation) {
+        this.configured.setTrustStoreLocation(trustStoreLocation);
+    }
+
+    public String getTrustStorePassword() {
+        return configured.getTrustStorePassword();
+    }
+
+    public void setTrustStorePassword(String trustStorePassword) {
+        this.configured.setTrustStorePassword(trustStorePassword);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSslContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSslContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSslContext.java
new file mode 100644
index 0000000..feca77c
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSslContext.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+/**
+ * Provides a wrapper around the SSL settings that are used by Provider transport
+ * instances that use an SSL encryption layer.
+ */
+public class JmsSslContext {
+
+    private String keyStoreLocation;
+    private String keyStorePassword;
+    private String trustStoreLocation;
+    private String trustStorePassword;
+
+    private static final JmsSslContext initial = new JmsSslContext();
+    private static final ThreadLocal<JmsSslContext> current;
+
+    static {
+
+        initial.setKeyStoreLocation(System.getProperty("javax.net.ssl.keyStore"));
+        initial.setKeyStorePassword(System.getProperty("javax.net.ssl.keyStorePassword"));
+        initial.setTrustStoreLocation(System.getProperty("javax.net.ssl.trustStore"));
+        initial.setTrustStorePassword(System.getProperty("javax.net.ssl.keyStorePassword"));
+
+        current = new ThreadLocal<JmsSslContext>() {
+
+            @Override
+            protected JmsSslContext initialValue() {
+                return initial;
+            }
+        };
+    }
+
+    protected JmsSslContext() {
+    }
+
+    public JmsSslContext copy() {
+        JmsSslContext result = new JmsSslContext();
+        result.setKeyStoreLocation(keyStoreLocation);
+        result.setKeyStorePassword(keyStorePassword);
+        result.setTrustStoreLocation(trustStoreLocation);
+        result.setTrustStorePassword(trustStorePassword);
+        return result;
+    }
+
+    static public void setCurrentSslContext(JmsSslContext bs) {
+        current.set(bs);
+    }
+
+    static public JmsSslContext getCurrentSslContext() {
+        return current.get();
+    }
+
+    public String getKeyStoreLocation() {
+        return keyStoreLocation;
+    }
+
+    public void setKeyStoreLocation(String keyStoreLocation) {
+        this.keyStoreLocation = keyStoreLocation;
+    }
+
+    public String getKeyStorePassword() {
+        return keyStorePassword;
+    }
+
+    public void setKeyStorePassword(String keyStorePassword) {
+        this.keyStorePassword = keyStorePassword;
+    }
+
+    public String getTrustStoreLocation() {
+        return trustStoreLocation;
+    }
+
+    public void setTrustStoreLocation(String trustStoreLocation) {
+        this.trustStoreLocation = trustStoreLocation;
+    }
+
+    public String getTrustStorePassword() {
+        return trustStorePassword;
+    }
+
+    public void setTrustStorePassword(String trustStorePassword) {
+        this.trustStorePassword = trustStorePassword;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTemporaryQueue.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTemporaryQueue.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTemporaryQueue.java
new file mode 100644
index 0000000..cff489b
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTemporaryQueue.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.JMSException;
+import javax.jms.TemporaryQueue;
+
+/**
+ * Temporary Queue Object
+ */
+public class JmsTemporaryQueue extends JmsDestination implements TemporaryQueue {
+
+    public JmsTemporaryQueue() {
+        this(null);
+    }
+
+    public JmsTemporaryQueue(String name) {
+        super(name, false, true);
+    }
+
+    @Override
+    public JmsTemporaryQueue copy() {
+        final JmsTemporaryQueue copy = new JmsTemporaryQueue();
+        copy.setProperties(getProperties());
+        return copy;
+    }
+
+    /**
+     * @see javax.jms.TemporaryQueue#delete()
+     */
+    @Override
+    public void delete() {
+        try {
+            tryDelete();
+        } catch (JMSException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * @return name
+     * @see javax.jms.Queue#getQueueName()
+     */
+    @Override
+    public String getQueueName() {
+        return getName();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTemporaryTopic.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTemporaryTopic.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTemporaryTopic.java
new file mode 100644
index 0000000..46dfed3
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTemporaryTopic.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.JMSException;
+import javax.jms.TemporaryTopic;
+
+/**
+ * Temporary Topic Object
+ */
+public class JmsTemporaryTopic extends JmsDestination implements TemporaryTopic {
+
+    public JmsTemporaryTopic() {
+        super(null, true, true);
+    }
+
+    public JmsTemporaryTopic(String name) {
+        super(name, true, true);
+    }
+
+    @Override
+    public JmsTemporaryTopic copy() {
+        final JmsTemporaryTopic copy = new JmsTemporaryTopic();
+        copy.setProperties(getProperties());
+        return copy;
+    }
+
+    /**
+     * @see javax.jms.TemporaryTopic#delete()
+     */
+    @Override
+    public void delete() {
+        try {
+            tryDelete();
+        } catch (JMSException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * @return name
+     * @see javax.jms.Topic#getTopicName()
+     */
+    @Override
+    public String getTopicName() {
+        return getName();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopic.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopic.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopic.java
new file mode 100644
index 0000000..1840cc7
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopic.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.Topic;
+
+/**
+ * JMS Topic object.
+ */
+public class JmsTopic extends JmsDestination implements Topic {
+
+    public JmsTopic() {
+        this(null);
+    }
+
+    public JmsTopic(String name) {
+        super(name, true, false);
+    }
+
+    @Override
+    public JmsTopic copy() {
+        final JmsTopic copy = new JmsTopic();
+        copy.setProperties(getProperties());
+        return copy;
+    }
+
+    /**
+     * @return the name
+     * @see javax.jms.Topic#getTopicName()
+     */
+    @Override
+    public String getTopicName() {
+        return getName();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicConnection.java
new file mode 100644
index 0000000..c8fcaba
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicConnection.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.ConnectionConsumer;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueSession;
+import javax.jms.ServerSessionPool;
+
+import org.apache.qpid.jms.provider.Provider;
+import org.apache.qpid.jms.util.IdGenerator;
+
+public class JmsTopicConnection extends JmsConnection {
+
+    public JmsTopicConnection(String connectionId, Provider provider, IdGenerator clientIdGenerator) throws JMSException {
+        super(connectionId, provider, clientIdGenerator);
+    }
+
+    @Override
+    public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+        throw new javax.jms.IllegalStateException("Operation not supported by a TopicConnection");
+    }
+
+    @Override
+    public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
+        throw new javax.jms.IllegalStateException("Operation not supported by a TopicConnection");
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicPublisher.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicPublisher.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicPublisher.java
new file mode 100644
index 0000000..47d5088
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicPublisher.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+
+import org.apache.qpid.jms.meta.JmsProducerId;
+
+/**
+ * Implementation of a TopicPublisher
+ */
+public class JmsTopicPublisher extends JmsMessageProducer implements TopicPublisher {
+
+    /**
+     * Constructor
+     *
+     * @param s
+     * @param destination
+     */
+    protected JmsTopicPublisher(JmsProducerId id, JmsSession session, JmsDestination destination) throws JMSException {
+        super(id, session, destination);
+    }
+
+    /**
+     * @return the Topic
+     * @throws IllegalStateException
+     * @see javax.jms.TopicPublisher#getTopic()
+     */
+    @Override
+    public Topic getTopic() throws IllegalStateException {
+        checkClosed();
+        return (Topic) this.producerInfo.getDestination();
+    }
+
+    /**
+     * @param message
+     * @throws JMSException
+     * @see javax.jms.TopicPublisher#publish(javax.jms.Message)
+     */
+    @Override
+    public void publish(Message message) throws JMSException {
+        super.send(message);
+    }
+
+    /**
+     * @param topic
+     * @param message
+     * @throws JMSException
+     * @see javax.jms.TopicPublisher#publish(javax.jms.Topic, javax.jms.Message)
+     */
+    @Override
+    public void publish(Topic topic, Message message) throws JMSException {
+        super.send(topic, message);
+    }
+
+    /**
+     * @param message
+     * @param deliveryMode
+     * @param priority
+     * @param timeToLive
+     * @throws JMSException
+     * @see javax.jms.TopicPublisher#publish(javax.jms.Message, int, int, long)
+     */
+    @Override
+    public void publish(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
+        super.send(message, deliveryMode, priority, timeToLive);
+    }
+
+    /**
+     * @param topic
+     * @param message
+     * @param deliveryMode
+     * @param priority
+     * @param timeToLive
+     * @throws JMSException
+     * @see javax.jms.TopicPublisher#publish(javax.jms.Topic, javax.jms.Message, int, int, long)
+     */
+    @Override
+    public void publish(Topic topic, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
+        super.send(topic, message, deliveryMode, priority, timeToLive);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicSession.java
new file mode 100644
index 0000000..ff834aa
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicSession.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.TemporaryQueue;
+
+import org.apache.qpid.jms.meta.JmsSessionId;
+
+/**
+ * Implementation of a TopicSession
+ */
+public class JmsTopicSession extends JmsSession {
+
+    protected JmsTopicSession(JmsConnection connection, JmsSessionId sessionId, int acknowledgementMode) throws JMSException {
+        super(connection, sessionId, acknowledgementMode);
+    }
+
+    /**
+     * @param queue
+     * @return
+     * @throws JMSException
+     * @see javax.jms.Session#createBrowser(javax.jms.Queue)
+     */
+    @Override
+    public QueueBrowser createBrowser(Queue queue) throws JMSException {
+        throw new IllegalStateException("Operation not supported by a TopicSession");
+    }
+
+    /**
+     * @param queue
+     * @param messageSelector
+     * @return
+     * @throws JMSException
+     * @see javax.jms.Session#createBrowser(javax.jms.Queue, java.lang.String)
+     */
+    @Override
+    public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
+        throw new IllegalStateException("Operation not supported by a TopicSession");
+    }
+
+    /**
+     * @param destination
+     * @return
+     * @throws JMSException
+     * @see javax.jms.Session#createConsumer(javax.jms.Destination)
+     */
+    @Override
+    public MessageConsumer createConsumer(Destination destination) throws JMSException {
+        if (destination instanceof Queue) {
+            throw new IllegalStateException("Operation not supported by a TopicSession");
+        }
+        return super.createConsumer(destination);
+    }
+
+    /**
+     * @param destination
+     * @param messageSelector
+     * @return
+     * @throws JMSException
+     * @see javax.jms.Session#createConsumer(javax.jms.Destination,
+     *      java.lang.String)
+     */
+    @Override
+    public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
+        if (destination instanceof Queue) {
+            throw new IllegalStateException("Operation not supported by a TopicSession");
+        }
+        return super.createConsumer(destination, messageSelector);
+    }
+
+    /**
+     * @param destination
+     * @return
+     * @throws JMSException
+     * @see javax.jms.Session#createProducer(javax.jms.Destination)
+     */
+    @Override
+    public MessageProducer createProducer(Destination destination) throws JMSException {
+        if (destination instanceof Queue) {
+            throw new IllegalStateException("Operation not supported by a TopicSession");
+        }
+        return super.createProducer(destination);
+    }
+
+    /**
+     * @param queueName
+     * @return
+     * @throws JMSException
+     * @see javax.jms.Session#createQueue(java.lang.String)
+     */
+    @Override
+    public Queue createQueue(String queueName) throws JMSException {
+        throw new IllegalStateException("Operation not supported by a TopicSession");
+    }
+
+    /**
+     * @return
+     * @throws JMSException
+     * @see javax.jms.Session#createTemporaryQueue()
+     */
+    @Override
+    public TemporaryQueue createTemporaryQueue() throws JMSException {
+        throw new IllegalStateException("Operation not supported by a TopicSession");
+    }
+
+    /**
+     * @param queue
+     * @return
+     * @throws JMSException
+     * @see javax.jms.QueueSession#createReceiver(javax.jms.Queue)
+     */
+    @Override
+    public QueueReceiver createReceiver(Queue queue) throws JMSException {
+        throw new IllegalStateException("Operation not supported by a TopicSession");
+    }
+
+    /**
+     * @param queue
+     * @param messageSelector
+     * @return
+     * @throws JMSException
+     * @see javax.jms.QueueSession#createReceiver(javax.jms.Queue,
+     *      java.lang.String)
+     */
+    @Override
+    public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
+        throw new IllegalStateException("Operation not supported by a TopicSession");
+    }
+
+    /**
+     * @param queue
+     * @return
+     * @throws JMSException
+     * @see javax.jms.QueueSession#createSender(javax.jms.Queue)
+     */
+    @Override
+    public QueueSender createSender(Queue queue) throws JMSException {
+        throw new IllegalStateException("Operation not supported by a TopicSession");
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicSubscriber.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicSubscriber.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicSubscriber.java
new file mode 100644
index 0000000..0ef463a
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicSubscriber.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+import org.apache.qpid.jms.meta.JmsConsumerId;
+
+/**
+ * Implementation of a TopicSubscriber
+ */
+public class JmsTopicSubscriber extends JmsMessageConsumer implements TopicSubscriber {
+
+    /**
+     * Creates a non-durable TopicSubscriber
+     *
+     * @param id
+     * @param s
+     * @param destination
+     * @param noLocal
+     * @param selector
+     * @throws JMSException
+     */
+    JmsTopicSubscriber(JmsConsumerId id, JmsSession s, JmsDestination destination, boolean noLocal, String selector) throws JMSException {
+        super(id, s, destination, selector, noLocal);
+    }
+
+    /**
+     * Creates a TopicSubscriber that is durable.
+     *
+     * @param id
+     * @param s
+     * @param destination
+     * @param name
+     * @param noLocal
+     * @param selector
+     * @throws JMSException
+     */
+    JmsTopicSubscriber(JmsConsumerId id, JmsSession s, JmsDestination destination, String name, boolean noLocal, String selector) throws JMSException {
+        super(id, s, destination, name, selector, noLocal);
+    }
+
+    /**
+     * @return the Topic
+     * @throws IllegalStateException
+     * @see javax.jms.TopicSubscriber#getTopic()
+     */
+    @Override
+    public Topic getTopic() throws IllegalStateException {
+        checkClosed();
+        return (Topic) this.getDestination();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionListener.java
new file mode 100644
index 0000000..c0704a1
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionListener.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+/**
+ * Allows for a listener to be notified when a transaction is started, commits
+ * or is rolled back.
+ */
+public interface JmsTransactionListener {
+
+    void onTransactionStarted();
+
+    void onTransactionCommitted();
+
+    void onTransactionRolledBack();
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTxSynchronization.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTxSynchronization.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTxSynchronization.java
new file mode 100644
index 0000000..bda7979
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTxSynchronization.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+/**
+ * Interface for JmsResources that are part of a running transaction to use
+ * to register for notifications of transaction commit and rollback in order
+ * to execute specific actions.
+ *
+ * One such use of this might be for a consumer to register a synchronization
+ * when it is closed while it's parent session is still operating inside a
+ * transaction.  The Consumer can close itself following the commit or rollback
+ * of the running Transaction.
+ */
+public abstract class JmsTxSynchronization {
+
+    /**
+     * Called after a successful commit of the current Transaction.
+     *
+     * @throws Exception
+     */
+    public void afterCommit() throws Exception {
+    }
+
+    /**
+     * Called after the current transaction has been rolled back either
+     * by a call to rollback or by a failure to complete a commit operation.
+     *
+     * @throws Exception
+     */
+    public void afterRollback() throws Exception {
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/IdConversionException.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/IdConversionException.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/IdConversionException.java
new file mode 100644
index 0000000..b04b988
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/IdConversionException.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.exceptions;
+
+public class IdConversionException extends QpidJmsException
+{
+    private static final long serialVersionUID = -2349723813650476823L;
+
+    public IdConversionException(String reason)
+    {
+        super(reason);
+    }
+
+    public IdConversionException(String reason, Exception cause)
+    {
+        super(reason, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsConnectionClosedException.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsConnectionClosedException.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsConnectionClosedException.java
new file mode 100644
index 0000000..e58c54f
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsConnectionClosedException.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.exceptions;
+
+import java.io.IOException;
+
+import javax.jms.IllegalStateException;
+
+/**
+ * An exception thrown when attempt is made to use a connection when the connection has been closed.
+ */
+public class JmsConnectionClosedException extends IllegalStateException {
+    private static final long serialVersionUID = -7975982446284065025L;
+
+
+    public JmsConnectionClosedException(IOException cause) {
+        super("The JMS connection has been closed: " + extractMessage(cause));
+        initCause(cause);
+        setLinkedException(cause);
+    }
+
+    public JmsConnectionClosedException() {
+        super("The JMS connection has been closed", "AlreadyClosed");
+    }
+
+    private static String extractMessage(IOException cause) {
+        String m = cause.getMessage();
+        if (m == null || m.length() == 0) {
+            m = cause.toString();
+        }
+        return m;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsConnectionFailedException.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsConnectionFailedException.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsConnectionFailedException.java
new file mode 100644
index 0000000..e9b7068
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsConnectionFailedException.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.exceptions;
+
+import java.io.IOException;
+
+import javax.jms.IllegalStateException;
+
+/**
+ * An exception thrown when attempt is made to use a connection when the connection has already failed.
+ */
+public class JmsConnectionFailedException extends IllegalStateException {
+
+    private static final long serialVersionUID = -3386897790274799220L;
+
+    public JmsConnectionFailedException(IOException cause) {
+        super("The JMS connection has failed: " + extractMessage(cause));
+        initCause(cause);
+        setLinkedException(cause);
+    }
+
+    public JmsConnectionFailedException() {
+        super("The JMS connection has failed due to a Transport problem", "Connection Failed");
+    }
+
+    private static String extractMessage(IOException cause) {
+        String m = cause.getMessage();
+        if (m == null || m.length() == 0) {
+            m = cause.toString();
+        }
+        return m;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsExceptionSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsExceptionSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsExceptionSupport.java
new file mode 100644
index 0000000..81f9ca8
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsExceptionSupport.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.exceptions;
+
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+
+/**
+ * Exception support class.
+ *
+ * Factory class for creating JMSException instances based on String messages or by
+ * wrapping other non-JMS exception.
+ *
+ * @since 1.0
+ */
+public final class JmsExceptionSupport {
+
+    private JmsExceptionSupport() {}
+
+    public static JMSException create(String msg, Throwable cause) {
+        JMSException exception = new JMSException(msg);
+        exception.initCause(cause);
+        return exception;
+    }
+
+    public static JMSException create(String msg, Exception cause) {
+        JMSException exception = new JMSException(msg);
+        exception.setLinkedException(cause);
+        exception.initCause(cause);
+        return exception;
+    }
+
+    public static JMSException create(Throwable cause) {
+        if (cause instanceof JMSException) {
+            return (JMSException) cause;
+        }
+        if (cause.getCause() instanceof JMSException) {
+            return (JMSException) cause.getCause();
+        }
+
+        String msg = cause.getMessage();
+        if (msg == null || msg.length() == 0) {
+            msg = cause.toString();
+        }
+        JMSException exception = new JMSException(msg);
+        exception.initCause(cause);
+        return exception;
+    }
+
+    public static JMSException create(Exception cause) {
+        if (cause instanceof JMSException) {
+            return (JMSException) cause;
+        }
+        if (cause.getCause() instanceof JMSException) {
+            return (JMSException) cause.getCause();
+        }
+
+        String msg = cause.getMessage();
+        if (msg == null || msg.length() == 0) {
+            msg = cause.toString();
+        }
+        JMSException exception = new JMSException(msg);
+        exception.setLinkedException(cause);
+        exception.initCause(cause);
+        return exception;
+    }
+
+    public static MessageEOFException createMessageEOFException(Exception cause) {
+        String msg = cause.getMessage();
+        if (msg == null || msg.length() == 0) {
+            msg = cause.toString();
+        }
+        MessageEOFException exception = new MessageEOFException(msg);
+        exception.setLinkedException(cause);
+        exception.initCause(cause);
+        return exception;
+    }
+
+    public static MessageFormatException createMessageFormatException(Throwable cause) {
+        String msg = cause.getMessage();
+        if (msg == null || msg.length() == 0) {
+            msg = cause.toString();
+        }
+        MessageFormatException exception = new MessageFormatException(msg);
+        exception.initCause(cause);
+        return exception;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/QpidJmsException.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/QpidJmsException.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/QpidJmsException.java
new file mode 100644
index 0000000..a922530
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/QpidJmsException.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.exceptions;
+
+import javax.jms.JMSException;
+
+public class QpidJmsException extends JMSException
+{
+    private static final long serialVersionUID = 751932967255393054L;
+
+    public QpidJmsException(String reason)
+    {
+        this(reason, null);
+    }
+
+    public QpidJmsException(String reason, Exception cause)
+    {
+        super(reason);
+        if (cause != null)
+        {
+            setLinkedException(cause);
+            initCause(cause);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/jndi/JNDIReferenceFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/jndi/JNDIReferenceFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/jndi/JNDIReferenceFactory.java
new file mode 100644
index 0000000..5d0d04a
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/jndi/JNDIReferenceFactory.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.jndi;
+
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+
+import javax.naming.Context;
+import javax.naming.Name;
+import javax.naming.NamingException;
+import javax.naming.RefAddr;
+import javax.naming.Reference;
+import javax.naming.StringRefAddr;
+import javax.naming.spi.ObjectFactory;
+
+/**
+ * Converts objects implementing JNDIStorable into a property fields so they can be
+ * stored and regenerated from JNDI
+ *
+ * @since 1.0
+ */
+public class JNDIReferenceFactory implements ObjectFactory {
+
+    /**
+     * This will be called by a JNDIprovider when a Reference is retrieved from
+     * a JNDI store - and generates the original instance
+     *
+     * @param object
+     *        the Reference object
+     * @param name
+     *        the JNDI name
+     * @param nameCtx
+     *        the context
+     * @param environment
+     *        the environment settings used by JNDI
+     *
+     * @return the instance built from the Reference object
+     *
+     * @throws Exception
+     *         if building the instance from Reference fails (usually class not
+     *         found)
+     */
+    @Override
+    public Object getObjectInstance(Object object, Name name, Context nameCtx, Hashtable<?, ?> environment)
+            throws Exception {
+        Object result = null;
+        if (object instanceof Reference) {
+            Reference reference = (Reference) object;
+            Class<?> theClass = loadClass(this, reference.getClassName());
+            if (JNDIStorable.class.isAssignableFrom(theClass)) {
+                JNDIStorable store = (JNDIStorable) theClass.newInstance();
+                Map<String, String> properties = new HashMap<String, String>();
+                for (Enumeration<RefAddr> iter = reference.getAll(); iter.hasMoreElements();) {
+                    StringRefAddr addr = (StringRefAddr) iter.nextElement();
+                    properties.put(addr.getType(), (addr.getContent() == null) ? "" : addr.getContent().toString());
+                }
+                store.setProperties(properties);
+                result = store;
+            }
+        } else {
+            throw new RuntimeException("Object " + object + " is not a reference");
+        }
+        return result;
+    }
+
+    /**
+     * Create a Reference instance from a JNDIStorable object
+     *
+     * @param instanceClassName
+     * @param po
+     * @return Reference
+     * @throws NamingException
+     */
+    public static Reference createReference(String instanceClassName, JNDIStorable po) throws NamingException {
+        Reference result = new Reference(instanceClassName, JNDIReferenceFactory.class.getName(), null);
+        try {
+            Map<String, String> props = po.getProperties();
+            for (Map.Entry<String, String> entry : props.entrySet()) {
+                javax.naming.StringRefAddr addr = new javax.naming.StringRefAddr(entry.getKey(), entry.getValue());
+                result.add(addr);
+            }
+        } catch (Exception e) {
+            throw new NamingException(e.getMessage());
+        }
+        return result;
+    }
+
+    /**
+     * Retrieve the class loader for a named class
+     *
+     * @param thisObj
+     * @param className
+     * @return the class
+     * @throws ClassNotFoundException
+     */
+    public static Class<?> loadClass(Object thisObj, String className) throws ClassNotFoundException {
+        // try local ClassLoader first.
+        ClassLoader loader = thisObj.getClass().getClassLoader();
+        Class<?> theClass;
+        if (loader != null) {
+            theClass = loader.loadClass(className);
+        } else {
+            // Will be null in jdk1.1.8
+            // use default classLoader
+            theClass = Class.forName(className);
+        }
+        return theClass;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org