You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/09/19 13:36:26 UTC

svn commit: r577253 [4/7] - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity: nclient/ nclient/impl/ nclient/util/ njms/ njms/message/

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,1324 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.qpidity.njms.message.*;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.transport.RangeSet;
+
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+import java.io.Serializable;
+import java.util.LinkedList;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Implementation of the JMS Session interface
+ */
+public class SessionImpl implements Session
+{
+    /**
+     * this session's logger
+     */
+    private static final Logger _logger = LoggerFactory.getLogger(SessionImpl.class);
+
+    /**
+     * A queue for incoming asynch messages.
+     */
+    private final LinkedList<IncomingMessage> _incomingAsynchronousMessages = new LinkedList<IncomingMessage>();
+
+    //--- MessageDispatcherThread and Session locking
+    /**
+     * indicates that the MessageDispatcherThread has stopped
+     */
+    private boolean _hasStopped = false;
+
+    /**
+     * lock for the MessageDispatcherThread to wait until the session is stopped
+     */
+    private final Object _stoppingLock = new Object();
+
+    /**
+     * lock for the stopper thread to wait on when the MessageDispatcherThread is stopping
+     */
+    private final Object _stoppingJoin = new Object();
+
+    /**
+     * thread to dispatch messages to async consumers
+     */
+    private MessageDispatcherThread _messageDispatcherThread = null;
+    //----END
+
+    /**
+     * The messageActors of this session.
+     */
+    private final HashMap<String, MessageActor> _messageActors = new HashMap<String, MessageActor>();
+
+    /**
+     * All the not yet acknoledged messages
+     */
+    private final ArrayList<QpidMessage> _unacknowledgedMessages = new ArrayList<QpidMessage>();
+
+    /**
+     * Indicates whether this session is closed.
+     */
+    private boolean _isClosed = false;
+
+    /**
+     * Indicates whether this session is closing.
+     */
+    private boolean _isClosing = false;
+
+    /**
+     * Indicates whether this session is stopped.
+     */
+    private boolean _isStopped = false;
+
+    /**
+     * Used to indicate whether or not this is a transactional session.
+     */
+    private boolean _transacted;
+
+    /**
+     * Holds the sessions acknowledgement mode.
+     */
+    private int _acknowledgeMode;
+
+    /**
+     * The underlying QpidSession
+     */
+    private org.apache.qpidity.nclient.Session _qpidSession;
+
+    /**
+     * The latest qpid Exception that has been reaised.
+     */
+    private QpidException _currentException;
+
+    /**
+     * Indicates whether this session is recovering
+     */
+    private boolean _inRecovery = false;
+
+    /**
+     * This session connection
+     */
+    private ConnectionImpl _connection;
+
+    /**
+     * This will be used as the message actor id
+     * This in turn will be set as the destination
+     */
+    protected AtomicInteger _consumerTag = new AtomicInteger();
+
+    //--- Constructor
+    /**
+     * Create a JMS Session
+     *
+     * @param connection      The ConnectionImpl object from which the Session is created.
+     * @param transacted      Indicates if the session transacted.
+     * @param acknowledgeMode The session's acknowledgement mode. This value is ignored and set to
+     *                        {@link Session#SESSION_TRANSACTED} if the <code>transacted</code> parameter is true.
+     * @param isXA            Indicates whether this session is an XA session.
+     * @throws QpidException In case of internal error.
+     */
+    protected SessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode, boolean isXA)
+            throws QpidException
+    {
+        _connection = connection;
+        _transacted = transacted;
+        // for transacted sessions we ignore the acknowledgeMode and use GenericAckMode.SESSION_TRANSACTED
+        if (_transacted)
+        {
+            acknowledgeMode = Session.SESSION_TRANSACTED;
+        }
+        _acknowledgeMode = acknowledgeMode;
+
+        // create the qpid session with an expiry  <= 0 so that the session does not expire
+        _qpidSession = _connection.getQpidConnection().createSession(0);
+        // set the exception listnere for this session
+        _qpidSession.setExceptionListener(new QpidSessionExceptionListener());
+        // set transacted if required
+        if (_transacted && !isXA)
+        {
+            _qpidSession.txSelect();
+        }
+        testQpidException();
+        // init the message dispatcher.
+        initMessageDispatcherThread();
+    }
+
+    //--- javax.njms.Session API
+    /**
+     * Creates a <CODE>BytesMessage</CODE> object used to send a message
+     * containing a stream of uninterpreted bytes.
+     *
+     * @return A BytesMessage.
+     * @throws JMSException If Creating a BytesMessage object fails due to some internal error.
+     */
+    public BytesMessage createBytesMessage() throws JMSException
+    {
+        checkNotClosed();
+        return new BytesMessageImpl();
+    }
+
+    /**
+     * Creates a <CODE>MapMessage</CODE> object used to send a self-defining set
+     * of name-value pairs, where names are Strings and values are primitive values.
+     *
+     * @return A MapMessage.
+     * @throws JMSException If Creating a MapMessage object fails due to some internal error.
+     */
+    public MapMessage createMapMessage() throws JMSException
+    {
+        checkNotClosed();
+        return new MapMessageImpl();
+    }
+
+    /**
+     * Creates a <code>Message</code> object that holds all the standard message header information.
+     * It can be sent when a message containing only header information is sufficient.
+     * We simply return a ByteMessage
+     *
+     * @return A Message.
+     * @throws JMSException If Creating a Message object fails due to some internal error.
+     */
+    public Message createMessage() throws JMSException
+    {
+        return new MessageImpl();
+    }
+
+    /**
+     * Creates an <code>ObjectMessage</code> used to send a message
+     * that contains a serializable Java object.
+     *
+     * @return An ObjectMessage.
+     * @throws JMSException If Creating an ObjectMessage object fails due to some internal error.
+     */
+    public ObjectMessage createObjectMessage() throws JMSException
+    {
+        checkNotClosed();
+        return new ObjectMessageImpl();
+    }
+
+    /**
+     * Creates an initialized <code>ObjectMessage</code> used to send a message that contains
+     * a serializable Java object.
+     *
+     * @param serializable The object to use to initialize this message.
+     * @return An initialised ObjectMessage.
+     * @throws JMSException If Creating an initialised ObjectMessage object fails due to some internal error.
+     */
+    public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException
+    {
+        ObjectMessage msg = createObjectMessage();
+        msg.setObject(serializable);
+        return msg;
+    }
+
+    /**
+     * Creates a <code>StreamMessage</code>  used to send a
+     * self-defining stream of primitive values in the Java programming
+     * language.
+     *
+     * @return A StreamMessage
+     * @throws JMSException If Creating an StreamMessage object fails due to some internal error.
+     */
+    public StreamMessage createStreamMessage() throws JMSException
+    {
+        checkNotClosed();
+        return new StreamMessageImpl();
+    }
+
+    /**
+     * Creates a <code>TextMessage</code> object used to send a message containing a String.
+     *
+     * @return A TextMessage object
+     * @throws JMSException If Creating an TextMessage object fails due to some internal error.
+     */
+    public TextMessage createTextMessage() throws JMSException
+    {
+        checkNotClosed();
+        return new TextMessageImpl();
+    }
+
+    /**
+     * Creates an initialized <code>TextMessage</code>  used to send
+     * a message containing a String.
+     *
+     * @param text The string used to initialize this message.
+     * @return An initialized TextMessage
+     * @throws JMSException If Creating an initialised TextMessage object fails due to some internal error.
+     */
+    public TextMessage createTextMessage(String text) throws JMSException
+    {
+        TextMessage msg = createTextMessage();
+        msg.setText(text);
+        return msg;
+    }
+
+    /**
+     * Indicates whether the session is in transacted mode.
+     *
+     * @return true if the session is in transacted mode
+     * @throws JMSException If geting the transaction mode fails due to some internal error.
+     */
+    public boolean getTransacted() throws JMSException
+    {
+        checkNotClosed();
+        return _transacted;
+    }
+
+    /**
+     * Returns the acknowledgement mode of this session.
+     * <p> The acknowledgement mode is set at the time that the session is created.
+     * If the session is transacted, the acknowledgement mode is ignored.
+     *
+     * @return If the session is not transacted, returns the current acknowledgement mode for the session.
+     *         else returns SESSION_TRANSACTED.
+     * @throws JMSException if geting the acknowledgement mode fails due to some internal error.
+     */
+    public int getAcknowledgeMode() throws JMSException
+    {
+        checkNotClosed();
+        return _acknowledgeMode;
+    }
+
+    /**
+     * Commits all messages done in this transaction.
+     *
+     * @throws JMSException                   If committing the transaction fails due to some internal error.
+     * @throws TransactionRolledBackException If the transaction is rolled back due to some internal error during commit.
+     * @throws javax.jms.IllegalStateException
+     *                                        If the method is not called by a transacted session.
+     */
+    public void commit() throws JMSException
+    {
+        checkNotClosed();
+        //make sure the Session is a transacted one
+        if (!_transacted)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Cannot commit non-transacted session, throwing IllegalStateException");
+            }
+            throw new IllegalStateException("Cannot commit non-transacted session", "Session is not transacted");
+        }
+        // commit the underlying Qpid Session
+        _qpidSession.txCommit();
+        try
+        {
+            testQpidException();
+        }
+        catch (QpidException e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+    }
+
+    /**
+     * Rolls back any messages done in this transaction.
+     *
+     * @throws JMSException If rolling back the session fails due to some internal error.
+     * @throws javax.jms.IllegalStateException
+     *                      If the method is not called by a transacted session.
+     */
+    public void rollback() throws JMSException
+    {
+        checkNotClosed();
+        //make sure the Session is a transacted one
+        if (!_transacted)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Cannot rollback non-transacted session, throwing IllegalStateException");
+            }
+            throw new IllegalStateException("Cannot rollback non-transacted session", "Session is not transacted");
+        }
+        // rollback the underlying Qpid Session
+        _qpidSession.txRollback();
+        try
+        {
+            testQpidException();
+        }
+        catch (QpidException e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+    }
+
+    /**
+     * Closes this session.
+     * <p> The JMS specification says
+     * <P> This call will block until a <code>receive</code> call or message
+     * listener in progress has completed. A blocked message consumer
+     * <code>receive</code> call returns <code>null</code> when this session is closed.
+     * <P>Closing a transacted session must roll back the transaction in progress.
+     * <P>This method is the only <code>Session</code> method that can be called concurrently.
+     * <P>Invoking any other <code>Session</code> method on a closed session
+     * must throw a <code>javax.njms.IllegalStateException</code>.
+     * <p> Closing a closed session must <I>not</I> throw an exception.
+     *
+     * @throws JMSException If closing the session fails due to some internal error.
+     */
+    public synchronized void close() throws JMSException
+    {
+        if (!_isClosed)
+        {
+            _messageDispatcherThread.interrupt();
+            if (!_isClosing)
+            {
+                _isClosing = true;
+                // if the session is stopped then restart it before notifying on the lock
+                // that will stop the sessionThread
+                if (_isStopped)
+                {
+                    startDispatchThread();
+                }
+                //notify the sessionThread
+                synchronized (_incomingAsynchronousMessages)
+                {
+                    _incomingAsynchronousMessages.notifyAll();
+                }
+
+                try
+                {
+                    _messageDispatcherThread.join();
+                    _messageDispatcherThread = null;
+                }
+                catch (InterruptedException ie)
+                {
+                    /* ignore */
+                }
+            }
+            // from now all the session methods will throw a IllegalStateException
+            _isClosed = true;
+            // close all the actors
+            closeAllMessageActors();
+            _messageActors.clear();
+            // We may have a thread trying to add a message
+            synchronized (_incomingAsynchronousMessages)
+            {
+                _incomingAsynchronousMessages.clear();
+                _incomingAsynchronousMessages.notifyAll();
+            }
+            // close the underlaying QpidSession
+            _qpidSession.sessionClose();
+            try
+            {
+                testQpidException();
+            }
+            catch (QpidException e)
+            {
+                throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+            }
+        }
+    }
+
+    /**
+     * Stops message delivery in this session, and restarts message delivery with
+     * the oldest unacknowledged message.
+     * <p>Recovering a session causes it to take the following actions:
+     * <ul>
+     * <li>Stop message delivery.
+     * <li>Mark all messages that might have been delivered but not acknowledged as "redelivered".
+     * <li>Restart the delivery sequence including all unacknowledged messages that had been
+     * previously delivered.
+     * Redelivered messages do not have to be delivered in exactly their original delivery order.
+     * </ul>
+     *
+     * @throws JMSException If the JMS provider fails to stop and restart message delivery due to some internal error.
+     *                      Not that this does not necessarily mean that the recovery has failed, but simply that it is
+     *                      not possible to tell if it has or not.
+     */
+    public void recover() throws JMSException
+    {
+        // Ensure that the session is open.
+        checkNotClosed();
+        // we are recovering
+        _inRecovery = true;
+        // Ensure that the session is not transacted.
+        if (getTransacted())
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Trying to recover a transacted Session, throwing IllegalStateException");
+            }
+            throw new IllegalStateException("Session is transacted");
+        }
+        // release all unack messages
+        RangeSet ranges = new RangeSet();
+        for (QpidMessage message : _unacknowledgedMessages)
+        {
+            // release this message           
+            ranges.add(message.getMessageTransferId());
+        }
+        getQpidSession().messageRelease(ranges);
+    }
+
+    /**
+     * Returns the session's distinguished message listener (optional).
+     * <p>This is an expert facility used only by Application Servers.
+     * <p> This is an optional operation that is not yet supported
+     *
+     * @return The message listener associated with this session.
+     * @throws JMSException If getting the message listener fails due to an internal error.
+     */
+    public MessageListener getMessageListener() throws JMSException
+    {
+        checkNotClosed();
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug(
+                    "Getting session's distinguished message listener, not supported," + " throwing UnsupportedOperationException");
+        }
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Sets the session's distinguished message listener.
+     * <p>This is an expert facility used only by Application Servers.
+     * <p> This is an optional operation that is not yet supported
+     *
+     * @param messageListener The message listener to associate with this session
+     * @throws JMSException If setting the message listener fails due to an internal error.
+     */
+    public void setMessageListener(MessageListener messageListener) throws JMSException
+    {
+        checkNotClosed();
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug(
+                    "Setting the session's distinguished message listener, not supported," + " throwing UnsupportedOperationException");
+        }
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Optional operation, intended to be used only by Application Servers,
+     * not by ordinary JMS clients.
+     * <p> This is an optional operation that is not yet supported
+     */
+    public void run()
+    {
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Running this session, not supported," + " throwing UnsupportedOperationException");
+        }
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Creates a MessageProducer to send messages to the specified destination.
+     *
+     * @param destination the Destination to send messages to, or null if this is a producer
+     *                    which does not have a specified destination.
+     * @return A new MessageProducer
+     * @throws JMSException                If the session fails to create a MessageProducer
+     *                                     due to some internal error.
+     * @throws InvalidDestinationException If an invalid destination is specified.
+     */
+    public MessageProducer createProducer(Destination destination) throws JMSException
+    {
+        checkNotClosed();
+        MessageProducerImpl producer = new MessageProducerImpl(this, (DestinationImpl) destination);
+        // register this actor with the session
+        _messageActors.put(producer.getMessageActorID(), producer);
+        return producer;
+    }
+
+    /**
+     * Creates a MessageConsumer for the specified destination.
+     *
+     * @param destination The <code>Destination</code> to access
+     * @return A new MessageConsumer for the specified destination.
+     * @throws JMSException                If the session fails to create a MessageConsumer due to some internal error.
+     * @throws InvalidDestinationException If an invalid destination is specified.
+     */
+    public MessageConsumer createConsumer(Destination destination) throws JMSException
+    {
+        return createConsumer(destination, null);
+    }
+
+    /**
+     * Creates a MessageConsumer for the specified destination, using a message selector.
+     *
+     * @param destination     The <code>Destination</code> to access
+     * @param messageSelector Only messages with properties matching the message selector expression are delivered.
+     * @return A new MessageConsumer for the specified destination.
+     * @throws JMSException                If the session fails to create a MessageConsumer due to some internal error.
+     * @throws InvalidDestinationException If an invalid destination is specified.
+     * @throws InvalidSelectorException    If the message selector is invalid.
+     */
+    public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
+    {
+        return createConsumer(destination, messageSelector, false);
+    }
+
+    /**
+     * Creates MessageConsumer for the specified destination, using a message selector.
+     * <p> This method can specify whether messages published by its own connection should
+     * be delivered to it, if the destination is a topic.
+     * <p/>
+     * <P>In some cases, a connection may both publish and subscribe to a topic. The consumer
+     * NoLocal attribute allows a consumer to inhibit the delivery of messages published by its
+     * own connection. The default value for this attribute is False.
+     *
+     * @param destination     The <code>Destination</code> to access
+     * @param messageSelector Only messages with properties matching the message selector expression are delivered.
+     * @param noLocal         If true, and the destination is a topic, inhibits the delivery of messages published
+     *                        by its own connection.
+     * @return A new MessageConsumer for the specified destination.
+     * @throws JMSException                If the session fails to create a MessageConsumer due to some internal error.
+     * @throws InvalidDestinationException If an invalid destination is specified.
+     * @throws InvalidSelectorException    If the message selector is invalid.
+     */
+    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
+            throws JMSException
+    {
+        checkNotClosed();
+        checkDestination(destination);
+        MessageConsumerImpl consumer;
+        try
+        {
+            consumer = new MessageConsumerImpl(this, (DestinationImpl) destination, messageSelector, noLocal, null,
+                                               String.valueOf(_consumerTag.incrementAndGet()));
+        }
+        catch (Exception e)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Problem when creating consumer.", e);
+            }
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+        // register this actor with the session
+        _messageActors.put(consumer.getMessageActorID(), consumer);
+        return consumer;
+    }
+
+    /**
+     * Creates a queue identity by a given name.
+     * <P>This facility is provided for the rare cases where clients need to
+     * dynamically manipulate queue identity. It allows the creation of a
+     * queue identity with a provider-specific name. Clients that depend
+     * on this ability are not portable.
+     * <P>Note that this method is not for creating the physical queue.
+     * The physical creation of queues is an administrative task and is not
+     * to be initiated by the JMS API. The one exception is the
+     * creation of temporary queues, which is accomplished with the
+     * <code>createTemporaryQueue</code> method.
+     *
+     * @param queueName the name of this <code>Queue</code>
+     * @return a <code>Queue</code> with the given name
+     * @throws JMSException If the session fails to create a queue due to some internal error.
+     */
+    public Queue createQueue(String queueName) throws JMSException
+    {
+        checkNotClosed();
+        Queue result;
+        try
+        {
+            result = new QueueImpl(this, queueName);
+        }
+        catch (QpidException e)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Problem when creating Queue.", e);
+            }
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+        return result;
+    }
+
+    /**
+     * Creates a topic identity given a Topicname.
+     * <P>This facility is provided for the rare cases where clients need to
+     * dynamically manipulate queue identity. It allows the creation of a
+     * queue identity with a provider-specific name. Clients that depend
+     * on this ability are not portable.
+     * <P>Note that this method is not for creating the physical queue.
+     * The physical creation of queues is an administrative task and is not
+     * to be initiated by the JMS API. The one exception is the
+     * creation of temporary queues, which is accomplished with the
+     * <code>createTemporaryTopic</code> method.
+     *
+     * @param topicName The name of this <code>Topic</code>
+     * @return a <code>Topic</code> with the given name
+     * @throws JMSException If the session fails to create a topic due to some internal error.
+     */
+    public Topic createTopic(String topicName) throws JMSException
+    {
+        checkNotClosed();
+        Topic result;
+        try
+        {
+            result = new TopicImpl(this, topicName);
+        }
+        catch (QpidException e)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Problem when creating Topic.", e);
+            }
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+        return result;
+    }
+
+    /**
+     * Creates a durable subscriber to the specified topic,
+     *
+     * @param topic The non-temporary <code>Topic</code> to subscribe to.
+     * @param name  The name used to identify this subscription.
+     * @return A durable subscriber to the specified topic,
+     * @throws JMSException                If creating a subscriber fails due to some internal error.
+     * @throws InvalidDestinationException If an invalid topic is specified.
+     * @throws InvalidSelectorException    If the message selector is invalid.
+     */
+    public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
+    {
+        // by default, use a null messageselector and set noLocal to falsen
+        return createDurableSubscriber(topic, name, null, false);
+    }
+
+    /**
+     * Creates a durable subscriber to the specified topic, using a message selector and specifying whether messages
+     * published by its
+     * own connection should be delivered to it.
+     * <p> A client can change an existing durable subscription by creating a durable <code>TopicSubscriber</code> with
+     * the same name and a new topic and/or message selector. Changing a durable subscriber is equivalent to
+     * unsubscribing (deleting) the old one and creating a new one.
+     *
+     * @param topic           The non-temporary <code>Topic</code> to subscribe to.
+     * @param name            The name used to identify this subscription.
+     * @param messageSelector Only messages with properties matching the message selector expression are delivered.
+     * @param noLocal         If set, inhibits the delivery of messages published by its own connection
+     * @return A durable subscriber to the specified topic,
+     * @throws JMSException                If creating a subscriber fails due to some internal error.
+     * @throws InvalidDestinationException If an invalid topic is specified.
+     * @throws InvalidSelectorException    If the message selector is invalid.
+     */
+    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
+            throws JMSException
+    {
+        checkNotClosed();
+        checkDestination(topic);
+        TopicSubscriberImpl subscriber;
+        try
+        {
+            subscriber = new TopicSubscriberImpl(this, topic, messageSelector, noLocal,
+                                                 _connection.getClientID() + ":" + name,
+                                                 String.valueOf(_consumerTag.incrementAndGet()));
+        }
+        catch (Exception e)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Problem when creating Durable Subscriber.", e);
+            }
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+        _messageActors.put(subscriber.getMessageActorID(), subscriber);
+        return subscriber;
+    }
+
+    /**
+     * Create a QueueBrowser to peek at the messages on the specified queue.
+     *
+     * @param queue The <code>Queue</code> to browse.
+     * @return A QueueBrowser.
+     * @throws JMSException                If creating a browser fails due to some internal error.
+     * @throws InvalidDestinationException If an invalid queue is specified.
+     */
+    public QueueBrowser createBrowser(Queue queue) throws JMSException
+    {
+        return createBrowser(queue, null);
+    }
+
+    /**
+     * Create a QueueBrowser to peek at the messages on the specified queue using a message selector.
+     *
+     * @param queue           The <code>Queue</code> to browse.
+     * @param messageSelector Only messages with properties matching the message selector expression are delivered.
+     * @return A QueueBrowser.
+     * @throws JMSException                If creating a browser fails due to some internal error.
+     * @throws InvalidDestinationException If an invalid queue is specified.
+     * @throws InvalidSelectorException    If the message selector is invalid.
+     */
+    public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
+    {
+        checkNotClosed();
+        checkDestination(queue);
+        QueueBrowserImpl browser;
+        try
+        {
+            browser =
+                    new QueueBrowserImpl(this, queue, messageSelector, String.valueOf(_consumerTag.incrementAndGet()));
+        }
+        catch (Exception e)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Problem when creating Durable Browser.", e);
+            }
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+        // register this actor with the session
+        _messageActors.put(browser.getMessageActorID(), browser);
+        return browser;
+    }
+
+    /**
+     * Create a TemporaryQueue. Its lifetime will be the Connection unless it is deleted earlier.
+     *
+     * @return A temporary queue.
+     * @throws JMSException If creating the temporary queue fails due to some internal error.
+     */
+    public TemporaryQueue createTemporaryQueue() throws JMSException
+    {
+        TemporaryQueue result;
+        try
+        {
+            result = new TemporaryQueueImpl(this);
+        }
+        catch (QpidException e)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Problem when creating Durable Temporary Queue.", e);
+            }
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+        return result;
+    }
+
+    /**
+     * Create a TemporaryTopic. Its lifetime will be the Connection unless it is deleted earlier.
+     *
+     * @return A temporary topic.
+     * @throws JMSException If creating the temporary topic fails due to some internal error.
+     */
+    public TemporaryTopic createTemporaryTopic() throws JMSException
+    {
+        TemporaryTopic result;
+        try
+        {
+            result = new TemporaryTopicImpl(this);
+        }
+        catch (QpidException e)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Problem when creating Durable Temporary Topic.", e);
+            }
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+        return result;
+    }
+
+    /**
+     * Unsubscribes a durable subscription that has been created by a client.
+     * <p/>
+     * <P>This method deletes the state being maintained on behalf of the
+     * subscriber by its provider.
+     * <p/>
+     * <P>It is erroneous for a client to delete a durable subscription
+     * while there is an active <code>TopicSubscriber</code> for the
+     * subscription, or while a consumed message is part of a pending
+     * transaction or has not been acknowledged in the session.
+     *
+     * @param name the name used to identify this subscription
+     * @throws JMSException                if the session fails to unsubscribe to the durable subscription due to some internal error.
+     * @throws InvalidDestinationException if an invalid subscription name
+     *                                     is specified.
+     */
+    public void unsubscribe(String name) throws JMSException
+    {
+        checkNotClosed();
+    }
+
+    /**
+     * Get the latest thrown exception.
+     *
+     * @return The latest thrown exception.
+     */
+    public synchronized QpidException getCurrentException()
+    {
+        QpidException result = _currentException;
+        _currentException = null;
+        return result;
+    }
+    //----- Protected methods
+
+    /**
+     * Remove a message actor form this session
+     * <p> This method is called when an actor is independently closed.
+     *
+     * @param messageActor The closed actor.
+     */
+    protected void closeMessageActor(MessageActor messageActor)
+    {
+        _messageActors.remove(messageActor.getMessageActorID());
+    }
+
+    /**
+     * Idincates whether this session is stopped.
+     *
+     * @return True is this session is stopped, false otherwise.
+     */
+    protected boolean isStopped()
+    {
+        return _isStopped;
+    }
+
+    /**
+     * Start the flow of message to this session.
+     *
+     * @throws Exception If starting the session fails due to some communication error.
+     */
+    protected synchronized void start() throws Exception
+    {
+        if (_isStopped)
+        {
+            // start all the MessageActors
+            for (MessageActor messageActor : _messageActors.values())
+            {
+                messageActor.start();
+            }
+            startDispatchThread();
+        }
+    }
+
+    /**
+     * Restart delivery of asynch messages
+     */
+    private void startDispatchThread()
+    {
+        synchronized (_stoppingLock)
+        {
+            _isStopped = false;
+            _stoppingLock.notify();
+        }
+        synchronized (_stoppingJoin)
+        {
+            _hasStopped = false;
+        }
+    }
+
+    /**
+     * Stop the flow of message to this session.
+     *
+     * @throws Exception If stopping the session fails due to some communication error.
+     */
+    protected synchronized void stop() throws Exception
+    {
+        if (!_isClosing && !_isStopped)
+        {
+            // stop all the MessageActors
+            for (MessageActor messageActor : _messageActors.values())
+            {
+                messageActor.stop();
+            }
+            synchronized (_incomingAsynchronousMessages)
+            {
+                _isStopped = true;
+                // unlock the sessionThread that will then wait on _stoppingLock
+                _incomingAsynchronousMessages.notifyAll();
+            }
+            // wait for the sessionThread to stop processing messages
+            synchronized (_stoppingJoin)
+            {
+                while (!_hasStopped)
+                {
+                    try
+                    {
+                        _stoppingJoin.wait();
+                    }
+                    catch (InterruptedException e)
+                    {
+                        /* ignore */
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Notify this session that a message is processed
+     *
+     * @param message The processed message.
+     */
+    protected void preProcessMessage(QpidMessage message)
+    {
+        _inRecovery = false;
+    }
+
+    /**
+     * Dispatch this message to this session asynchronous consumers
+     *
+     * @param consumerID The consumer ID.
+     * @param message    The message to be dispatched.
+     */
+    public void dispatchMessage(String consumerID, QpidMessage message)
+    {
+        synchronized (_incomingAsynchronousMessages)
+        {
+            _incomingAsynchronousMessages.addLast(new IncomingMessage(consumerID, message));
+            _incomingAsynchronousMessages.notifyAll();
+        }
+    }
+
+    /**
+     * Indicate whether this session is recovering .
+     *
+     * @return true if this session is recovering.
+     */
+    protected boolean isInRecovery()
+    {
+        return _inRecovery;
+    }
+
+    /**
+     * Validate that the Session is not closed.
+     * <p/>
+     * If the Session has been closed, throw a IllegalStateException. This behaviour is
+     * required by the JMS specification.
+     *
+     * @throws IllegalStateException If the session is closed.
+     */
+    protected void checkNotClosed() throws IllegalStateException
+    {
+        if (_isClosed)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Session has been closed. Cannot invoke any further operations.");
+            }
+            throw new javax.jms.IllegalStateException("Session has been closed. Cannot invoke any further operations.");
+        }
+    }
+
+    /**
+     * Validate that the destination is valid i.e. it is not null
+     *
+     * @param dest The destination to be checked
+     * @throws InvalidDestinationException If the destination not valid.
+     */
+    protected void checkDestination(Destination dest) throws InvalidDestinationException
+    {
+        if (dest == null)
+        {
+            throw new javax.jms.InvalidDestinationException("Invalid destination specified: " + dest,
+                                                            "Invalid destination");
+        }
+    }
+
+    /**
+     * A session keeps the list of unack messages only when the ack mode is
+     * set to client ack mode. Otherwise messages are always ack.
+     * <p> We can use an ack heuristic for  dups ok mode where bunch of messages are ack.
+     * This has to be done.
+     *
+     * @param message The message to be acknowledged.
+     * @throws JMSException If the message cannot be acknowledged due to an internal error.
+     */
+    protected void acknowledgeMessage(QpidMessage message) throws JMSException
+    {
+        if (getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+        {
+            // messages will be acknowldeged by the client application.
+            // store this message for acknowledging it afterward
+            synchronized (_unacknowledgedMessages)
+            {
+                _unacknowledgedMessages.add(message);
+            }
+        }
+        else
+        {
+            // acknowledge this message
+            RangeSet ranges = new RangeSet();
+            ranges.add(message.getMessageTransferId());
+            getQpidSession().messageAcknowledge(ranges);
+        }
+        //tobedone: Implement DUPS OK heuristic
+    }
+
+    /**
+     * This method is called when a message is acked.
+     * <p/>
+     * <P>Acknowledgment of a message automatically acknowledges all
+     * messages previously received by the session. Clients may
+     * individually acknowledge messages or they may choose to acknowledge
+     * messages in application defined groups (which is done by acknowledging
+     * the last received message in the group).
+     *
+     * @throws JMSException If this method is called on a closed session.
+     */
+    public void acknowledge() throws JMSException
+    {
+        checkNotClosed();
+        if (getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+        {
+            synchronized (_unacknowledgedMessages)
+            {
+                for (QpidMessage message : _unacknowledgedMessages)
+                {
+                    // acknowledge this message
+                    RangeSet ranges = new RangeSet();
+                    ranges.add(message.getMessageTransferId());
+                    getQpidSession().messageAcknowledge(ranges);
+                }
+                //empty the list of unack messages
+                _unacknowledgedMessages.clear();
+            }
+        }
+        //else there is no effect
+    }
+
+    /**
+     * Access to the underlying Qpid Session
+     *
+     * @return The associated Qpid Session.
+     */
+    protected org.apache.qpidity.nclient.Session getQpidSession()
+    {
+        return _qpidSession;
+    }
+
+    /**
+     * Get this session's conneciton
+     *
+     * @return This session's connection
+     */
+    protected ConnectionImpl getConnection()
+    {
+        return _connection;
+    }
+
+    /**
+     * sync and return the potential exception
+     *
+     * @throws QpidException If an exception has been thrown by the broker.
+     */
+    protected void testQpidException() throws QpidException
+    {
+        //_qpidSession.sync();
+        QpidException qe = getCurrentException();
+        if (qe != null)
+        {
+            throw qe;
+        }
+    }
+
+    //------ Private Methods
+    /**
+     * Close the producer and the consumers of this session
+     *
+     * @throws JMSException If one of the MessaeActor cannot be closed due to some internal error.
+     */
+    private void closeAllMessageActors() throws JMSException
+    {
+        for (MessageActor messageActor : _messageActors.values())
+        {
+            messageActor.closeMessageActor();
+        }
+    }
+
+    /**
+     * create and start the MessageDispatcherThread.
+     */
+    private synchronized void initMessageDispatcherThread()
+    {
+        // Create and start a MessageDispatcherThread
+        // This thread is dispatching messages to the async consumers
+        _messageDispatcherThread = new MessageDispatcherThread();
+        _messageDispatcherThread.start();
+    }
+
+    //------ Inner classes
+
+    /**
+     * Lstener for qpid protocol exceptions
+     */
+    private class QpidSessionExceptionListener implements org.apache.qpidity.nclient.ExceptionListener
+    {
+        public void onException(QpidException exception)
+        {
+            synchronized (this)
+            {
+                //todo check the error code for finding out if we need to notify the
+                // JMS connection exception listener
+                _currentException = exception;
+            }
+        }
+    }
+
+    /**
+     * Convenient class for storing incoming messages associated with a consumer ID.
+     * <p> Those messages are enqueued in _incomingAsynchronousMessages
+     */
+    private class IncomingMessage
+    {
+        // The consumer ID
+        private String _consumerId;
+        // The message
+        private QpidMessage _message;
+
+        //-- constructor
+        /**
+         * Creat a new incoming message
+         *
+         * @param consumerId The consumer ID
+         * @param message    The message to be delivered
+         */
+        IncomingMessage(String consumerId, QpidMessage message)
+        {
+            _consumerId = consumerId;
+            _message = message;
+        }
+
+        // Getters
+        /**
+         * Get the consumer ID
+         *
+         * @return The consumer ID for this message
+         */
+        public String getConsumerId()
+        {
+            return _consumerId;
+        }
+
+        /**
+         * Get the message.
+         *
+         * @return The message.
+         */
+        public QpidMessage getMessage()
+        {
+            return _message;
+        }
+    }
+
+    /**
+     * A MessageDispatcherThread is attached to every SessionImpl.
+     * <p/>
+     * This thread is responsible for removing messages from m_incomingMessages and
+     * dispatching them to the appropriate MessageConsumer.
+     * <p> Messages have to be dispatched serially.
+     */
+    private class MessageDispatcherThread extends Thread
+    {
+        //--- Constructor
+        /**
+         * Create a Deamon thread for dispatching messages to this session listeners.
+         */
+        MessageDispatcherThread()
+        {
+            super("MessageDispatcher");
+            // this thread is Deamon
+            setDaemon(true);
+        }
+
+        /**
+         * Use to run this thread.
+         */
+        public void run()
+        {
+            IncomingMessage message = null;
+            // deliver messages to asynchronous consumers until the stop flag is set.
+            do
+            {
+                // When this session is not closing and and stopped
+                // then this thread needs to wait until messages are delivered.
+                synchronized (_incomingAsynchronousMessages)
+                {
+                    while (!_isClosing && !_isStopped && _incomingAsynchronousMessages.isEmpty())
+                    {
+                        try
+                        {
+                            _incomingAsynchronousMessages.wait();
+                        }
+                        catch (InterruptedException ie)
+                        {
+                            /* ignore */
+                        }
+                    }
+                }
+                // If this session is stopped then we need to wait on the stoppingLock
+                synchronized (_stoppingLock)
+                {
+                    try
+                    {
+                        while (_isStopped)
+                        {
+                            // if the session is stopped we have to notify the stopper thread
+                            synchronized (_stoppingJoin)
+                            {
+                                _hasStopped = true;
+                                _stoppingJoin.notify();
+                            }
+                            _stoppingLock.wait();
+                        }
+                    }
+                    catch (Exception ie)
+                    {
+                        /* ignore */
+                    }
+                }
+                synchronized (_incomingAsynchronousMessages)
+                {
+                    if (!_isClosing && !_incomingAsynchronousMessages.isEmpty())
+                    {
+                        message = _incomingAsynchronousMessages.getFirst();
+                    }
+                }
+
+                if (message != null)
+                {
+                    MessageConsumerImpl mc;
+                    synchronized (_messageActors)
+                    {
+                        mc = (MessageConsumerImpl) _messageActors.get(message.getConsumerId());
+                    }
+                    if (mc != null)
+                    {
+                        try
+                        {
+                            // mc.onMessage(message.getMessage());
+                            mc.notifyMessageListener(message.getMessage());
+                        }
+                        catch (RuntimeException t)
+                        {
+                            // the JMS specification tells us to flag that to the client!
+                            _logger.error(
+                                    "Warning! Asynchronous message consumer" + mc + " from session " + this + " has thrown a RunTimeException " + t);
+                        }
+                    }
+                }
+                message = null;
+            }
+            while (!_isClosing);   // repeat as long as this session is not closing
+        }
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TemporaryDestination.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TemporaryDestination.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TemporaryDestination.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TemporaryDestination.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import javax.jms.JMSException;
+
+/**
+ * Interface to abstract functionalities of temporary destinations.
+ */
+public interface TemporaryDestination
+{
+    /**
+     * Delete this temporary destination.
+     *
+      * @throws javax.jms.JMSException If the temporary destination cannot be deleted due to some internal error.
+     */
+    public void delete() throws JMSException;
+
+     /**
+     * Indicate whether this temporary destination is deleted
+     *  @return  True is this temporary destination is deleted, false otherwise 
+     */
+    public boolean isdeleted();
+
+}
\ No newline at end of file

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TemporaryDestination.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TemporaryQueueImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TemporaryQueueImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TemporaryQueueImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TemporaryQueueImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,88 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import org.apache.qpidity.QpidException;
+
+import javax.jms.TemporaryQueue;
+import javax.jms.JMSException;
+import java.util.UUID;
+
+/**
+ * Implements TemporaryQueue
+ */
+public class TemporaryQueueImpl extends QueueImpl implements TemporaryQueue, TemporaryDestination
+{
+    /**
+     * Indicates whether this temporary queue is deleted.
+     */
+    private boolean _isDeleted;
+
+    /**
+     * The session used to create this destination
+     */
+    private SessionImpl _session;
+
+    //--- constructor
+    /**
+     * Create a new TemporaryQueueImpl.
+     *
+     * @param session The session used to create this TemporaryQueueImpl.
+     * @throws QpidException If creating the TemporaryQueueImpl fails due to some error.
+     */
+    protected TemporaryQueueImpl(SessionImpl session) throws QpidException
+    {
+        super("TempQueue-" + UUID.randomUUID());
+        // temporary destinations do not have names
+        _isAutoDelete = false;
+        _isDurable = false;
+        _isExclusive = false;
+        _isDeleted = false;
+        _session = session;
+        // we must create this queue
+        registerQueue(session, true);
+    }
+
+    //-- TemporaryDestination Interface
+    /**
+     * Specify whether this temporary destination is deleted.
+     *
+     * @return true is this temporary destination is deleted.
+     */
+    public boolean isdeleted()
+    {
+        return _isDeleted;
+    }
+
+    //-- TemporaryQueue Interface
+    /**
+     * Delete this temporary destinaiton
+     *
+     * @throws JMSException If deleting this temporary queue fails due to some error.
+     */
+    public void delete() throws JMSException
+    {
+        if (!_isDeleted)
+        {
+            _session.getQpidSession().queueDelete(_queueName);
+        }
+        _isDeleted = true;
+    }
+
+}
+

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TemporaryQueueImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TemporaryTopicImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TemporaryTopicImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TemporaryTopicImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TemporaryTopicImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,71 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import org.apache.qpidity.QpidException;
+
+import javax.jms.TemporaryTopic;
+import javax.jms.JMSException;
+import java.util.UUID;
+
+
+/**
+ * Implements TemporaryTopic
+ */
+public class TemporaryTopicImpl extends TopicImpl implements TemporaryTopic, TemporaryDestination
+{
+    /**
+     * Indicates whether this temporary topic is deleted.
+     */
+    private boolean _isDeleted = false;
+
+    /**
+     * The session used to create this destination
+     */
+    private SessionImpl _session;
+
+    //--- constructor
+    /**
+     * Create a new TemporaryTopicImpl with a given name.
+     *
+     * @param session The session used to create this TemporaryTopicImpl.
+     * @throws QpidException If creating the TemporaryTopicImpl fails due to some error.
+     */
+    protected TemporaryTopicImpl(SessionImpl session) throws QpidException
+    {
+        // temporary destinations do not have names.
+        super(session, "TemporayTopic-" + UUID.randomUUID());
+        _session = session;
+    }
+
+    //-- TemporaryDestination Interface
+    public boolean isdeleted()
+    {
+        return _isDeleted;
+    }
+
+    //-- TemporaryTopic Interface
+    public void delete() throws JMSException
+    {
+        if (!_isDeleted)
+        {
+            _session.getQpidSession().queueDelete(_queueName);
+        }
+        _isDeleted = true;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TemporaryTopicImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TopicConnectionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TopicConnectionImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TopicConnectionImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TopicConnectionImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,35 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import org.apache.qpidity.QpidException;
+
+import javax.jms.TopicConnection;
+
+/**
+ * Implements javax.njms.TopicConnection
+ */
+public class TopicConnectionImpl extends ConnectionImpl implements TopicConnection
+{
+    //-- constructor
+    public TopicConnectionImpl(String host, int port, String virtualHost, String username, String password)
+            throws QpidException
+    {
+        super(host, port, virtualHost, username, password);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TopicConnectionImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TopicImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TopicImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TopicImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TopicImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,129 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.exchange.ExchangeDefaults;
+import org.apache.qpidity.transport.Option;
+import org.apache.qpidity.url.BindingURL;
+
+import javax.jms.Topic;
+import java.util.UUID;
+
+/**
+ * Implementation of the javax.njms.Topic interface.
+ */
+public class TopicImpl extends DestinationImpl implements Topic
+{
+    //--- Constructor
+    /**
+     * Create a new TopicImpl with a given name.
+     *
+     * @param name    The name of this topic
+     * @param session The session used to create this queue.
+     * @throws QpidException If the topic name is not valid
+     */
+    protected TopicImpl(SessionImpl session, String name) throws QpidException
+    {
+        super(name);
+        _queueName = "Topic-" + UUID.randomUUID();
+        _routingKey = name;
+        _destinationName = name;
+        _exchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
+        _exchangeType = ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
+        _isAutoDelete = true;
+        _isDurable = false;
+        _isExclusive = true;
+        checkTopicExists(session);
+    }
+
+    /**
+     * Create a new TopicImpl with a given name.
+     *
+     * @param name The name of this topic
+     * @throws QpidException If the topic name is not valid
+     */
+    public TopicImpl(String name) throws QpidException
+    {
+        super(name);
+        _queueName = "Topic-" + UUID.randomUUID();
+        _routingKey = name;
+        _destinationName = name;
+        _exchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
+        _exchangeType = ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
+        _isAutoDelete = true;
+        _isDurable = false;
+        _isExclusive = true;
+    }
+
+    /**
+     * Create a TopicImpl from a binding URL
+     *
+     * @param session The session used to create this Topic.
+     * @param binding The URL
+     * @throws QpidException If the URL is not valid
+     */
+    protected TopicImpl(SessionImpl session, BindingURL binding) throws QpidException
+    {
+        super(binding);
+        checkTopicExists(session);
+    }
+
+
+    /**
+     * Create a TopicImpl from a binding URL
+     *
+     * @param binding The URL
+     * @throws QpidException If the URL is not valid
+     */
+    public TopicImpl(BindingURL binding) throws QpidException
+    {
+        super(binding);
+    }
+
+    //--- javax.jsm.Topic Interface
+    /**
+     * Gets the name of this topic.
+     *
+     * @return This topic's name.
+     */
+    public String getTopicName()
+    {
+        return _destinationName;
+    }
+
+    /**
+     * Check that this exchange exists
+     *
+     * @param session The session used to create this Topic.
+     * @throws QpidException If this exchange does not exists on the broker.
+     */
+    private void checkTopicExists(SessionImpl session) throws QpidException
+    {
+        // test if this exchange exist on the broker
+        session.getQpidSession().exchangeDeclare(_exchangeName, _exchangeType, null, null, Option.PASSIVE);
+        // wait for the broker response
+        System.out.println("Checking for exchange");
+        
+        session.getQpidSession().sync();
+        
+        System.out.println("Calling sync()");
+        // todo get the exception
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TopicImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TopicPublisherImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TopicPublisherImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TopicPublisherImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TopicPublisherImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,128 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import javax.jms.*;
+
+/**
+ * Implements TopicPublisher
+ */
+public class TopicPublisherImpl extends MessageProducerImpl implements TopicPublisher
+{
+    //--- Constructor
+    /**
+     * Create a TopicPublisherImpl.
+     *
+     * @param session The session from which the TopicPublisherImpl is instantiated
+     * @param topic   The default topic for this TopicPublisherImpl
+     * @throws JMSException If the TopicPublisherImpl cannot be created due to some internal error.
+     */
+    protected TopicPublisherImpl(SessionImpl session, Topic topic) throws JMSException
+    {
+        super(session, (DestinationImpl) topic);
+    }
+
+    //--- Interface javax.njms.TopicPublisher
+    /**
+     * Get the topic associated with this TopicPublisher.
+     *
+     * @return This publisher's topic
+     * @throws JMSException If getting the topic fails due to some internal error.
+     */
+    public Topic getTopic() throws JMSException
+    {
+        return (Topic) getDestination();
+    }
+
+
+    /**
+     * Publish a message to the topic using the default delivery mode, priority and time to live.
+     *
+     * @param message The message to publish
+     * @throws JMSException If publishing the message fails due to some internal error.
+     * @throws javax.jms.MessageFormatException
+     *                      If an invalid message is specified.
+     * @throws javax.jms.InvalidDestinationException
+     *                      If an invalid topic is specified.
+     * @throws java.lang.UnsupportedOperationException
+     *                      If that publisher topic was not specified at creation time.
+     */
+    public void publish(Message message) throws JMSException
+    {
+        super.send(message);
+    }
+
+    /**
+     * Publish a message to the topic, specifying delivery mode, priority and time to live.
+     *
+     * @param message      The message to publish
+     * @param deliveryMode The delivery mode to use
+     * @param priority     The priority for this message
+     * @param timeToLive   The message's lifetime (in milliseconds)
+     * @throws JMSException If publishing the message fails due to some internal error.
+     * @throws javax.jms.MessageFormatException
+     *                      If an invalid message is specified.
+     * @throws javax.jms.InvalidDestinationException
+     *                      If an invalid topic is specified.
+     * @throws java.lang.UnsupportedOperationException
+     *                      If that publisher topic was not specified at creation time.
+     */
+    public void publish(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
+    {
+        super.send(message, deliveryMode, priority, timeToLive);
+    }
+
+
+    /**
+     * Publish a message to a topic for an unidentified message producer.
+     * Uses this TopicPublisher's default delivery mode, priority and time to live.
+     *
+     * @param topic   The topic to publish this message to
+     * @param message The message to publish
+     * @throws JMSException If publishing the message fails due to some internal error.
+     * @throws javax.jms.MessageFormatException
+     *                      If an invalid message is specified.
+     * @throws javax.jms.InvalidDestinationException
+     *                      If an invalid topic is specified.
+     */
+    public void publish(Topic topic, Message message) throws JMSException
+    {
+        super.send(topic, message);
+    }
+
+    /**
+     * Publishes a message to a topic for an unidentified message
+     * producer, specifying delivery mode, priority and time to live.
+     *
+     * @param topic        The topic to publish this message to
+     * @param message      The message to publish
+     * @param deliveryMode The delivery mode
+     * @param priority     The priority for this message
+     * @param timeToLive   The message's lifetime (in milliseconds)
+     * @throws JMSException If publishing the message fails due to some internal error.
+     * @throws javax.jms.MessageFormatException
+     *                      If an invalid message is specified.
+     * @throws javax.jms.InvalidDestinationException
+     *                      If an invalid topic is specified.
+     */
+    public void publish(Topic topic, Message message, int deliveryMode, int priority, long timeToLive) throws
+                                                                                                       JMSException
+    {
+        super.send(topic, message, deliveryMode, priority, timeToLive);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TopicPublisherImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TopicSessionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TopicSessionImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TopicSessionImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TopicSessionImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,155 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+
+import org.apache.qpidity.QpidException;
+
+/**
+ * Implements  TopicSession
+ */
+public class TopicSessionImpl extends SessionImpl implements TopicSession
+{
+    //-- constructor
+    /**
+     * Create a new TopicSessionImpl.
+     *
+     * @param connection      The ConnectionImpl object from which the Session is created.
+     * @param transacted      Specifiy whether this session is transacted?
+     * @param acknowledgeMode The session's acknowledgement mode. This value is ignored and set to
+     *                        {@link javax.jms.Session#SESSION_TRANSACTED} if the <code>transacted</code> parameter
+     *                        is true.
+     * @throws javax.jms.JMSSecurityException If the user could not be authenticated.
+     * @throws javax.jms.JMSException         In case of internal error.
+     */
+    protected TopicSessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode) throws QpidException, JMSException
+    {
+        super(connection, transacted, acknowledgeMode,false);
+    }
+
+    //-- Overwritten methods
+    /**
+     * Create a QueueBrowser.
+     *
+     * @param queue           The <CODE>Queue</CODE> to browse.
+     * @param messageSelector Only messages with properties matching the message selector expression are delivered.
+     * @return Always throws an exception
+     * @throws IllegalStateException Always
+     */
+    @Override
+    public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
+    {
+        throw new IllegalStateException("Cannot invoke createBrowser from TopicSession");
+    }
+
+    /**
+     * Create a QueueBrowser.
+     *
+     * @param queue The <CODE>Queue</CODE> to browse.
+     * @return Always throws an exception
+     * @throws IllegalStateException Always
+     */
+    @Override
+    public QueueBrowser createBrowser(Queue queue) throws JMSException
+    {
+        throw new IllegalStateException("Cannot invoke createBrowser from TopicSession");
+    }
+
+    /**
+     * Creates a temporary queue.
+     *
+     * @return Always throws an exception
+     * @throws IllegalStateException Always
+     */
+    @Override
+    public TemporaryQueue createTemporaryQueue() throws JMSException
+    {
+        throw new IllegalStateException("Cannot invoke createTemporaryQueue from TopicSession");
+    }
+
+    /**
+     * Creates a queue identity by a given name.
+     *
+     * @param queueName the name of this <CODE>Queue</CODE>
+     * @return Always throws an exception
+     * @throws IllegalStateException Always
+     */
+    @Override
+    public Queue createQueue(String queueName) throws JMSException
+    {
+        throw new IllegalStateException("Cannot invoke createQueue from TopicSession");
+    }
+
+    //--- Interface TopicSession
+    /**
+     * Create a publisher for the specified topic.
+     *
+     * @param topic the <CODE>Topic</CODE> to publish to, or null if this is an unidentified publisher.
+     * @throws JMSException                If the creating a publisher fails due to some internal error.
+     * @throws InvalidDestinationException If an invalid topic is specified.
+     */
+    public TopicPublisher createPublisher(Topic topic) throws JMSException
+    {
+
+        checkNotClosed();
+        // we do not check the destination topic here, since unidentified publishers are allowed.
+        return new TopicPublisherImpl(this, topic);
+    }
+
+    /**
+     * Creates a nondurable subscriber to the specified topic.
+     *
+     * @param topic The Topic to subscribe to
+     * @throws JMSException                If creating a subscriber fails due to some internal error.
+     * @throws InvalidDestinationException If an invalid topic is specified.
+     */
+    public TopicSubscriber createSubscriber(Topic topic) throws JMSException
+    {
+        return createSubscriber(topic, null, false);
+    }
+
+    /**
+     * Creates a nondurable subscriber to the specified topic, using a
+     * message selector or specifying whether messages published by its
+     * own connection should be delivered to it.
+     *
+     * @param topic           The Topic to subscribe to
+     * @param messageSelector A value of null or an empty string indicates that there is no message selector.
+     * @param noLocal         If true then inhibits the delivery of messages published by this subscriber's connection.
+     * @throws JMSException                If creating a subscriber fails due to some internal error.
+     * @throws InvalidDestinationException If an invalid topic is specified.
+     * @throws InvalidSelectorException    If the message selector is invalid.
+     */
+    public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
+    {
+        checkNotClosed();
+        checkDestination(topic);
+        TopicSubscriber topicSubscriber;
+        try
+        {
+            topicSubscriber = new TopicSubscriberImpl(this, topic, messageSelector, noLocal, null,String.valueOf(_consumerTag.incrementAndGet()));
+        }
+        catch (Exception e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+        return topicSubscriber;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TopicSessionImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TopicSubscriberImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TopicSubscriberImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TopicSubscriberImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TopicSubscriberImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,72 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import javax.jms.TopicSubscriber;
+import javax.jms.Topic;
+import javax.jms.JMSException;
+
+/**
+ * Implementation of the JMS TopicSubscriber interface.
+ */
+public class TopicSubscriberImpl extends MessageConsumerImpl implements TopicSubscriber
+{
+    //--- Constructor
+    /**
+     * Create a new TopicSubscriberImpl.
+     *
+     * @param session          The session of this topic subscriber.
+     * @param topic            The default topic for this TopicSubscriberImpl
+     * @param messageSelector  The MessageSelector
+     * @param noLocal          If true inhibits the delivery of messages published by its own connection.
+     * @param subscriptionName Name of the subscription if this is to be created as a durable subscriber.
+     *                         If this value is null, a non-durable subscription is created.
+     * @throws Exception If the TopicSubscriberImpl cannot be created due to internal error.
+     */
+    protected TopicSubscriberImpl(SessionImpl session, Topic topic, String messageSelector, boolean noLocal,
+                                  String subscriptionName,String consumerTag) throws Exception
+    {
+        super(session, (DestinationImpl) topic, messageSelector, noLocal, subscriptionName,consumerTag);
+    }
+
+    //---  javax.njms.TopicSubscriber interface
+    /**
+     * Get the Topic associated with this subscriber.
+     *
+     * @return This subscriber's Topic
+     * @throws JMSException if getting the topic for this topicSubscriber fails due to some internal error.
+     */
+    public Topic getTopic() throws JMSException
+    {
+        checkNotClosed();
+        return (TopicImpl) _destination;
+    }
+
+
+    /**
+     * Get NoLocal for this subscriber.
+     *
+     * @return True if locally published messages are being inhibited, false otherwise
+     * @throws JMSException If getting NoLocal for this topic subscriber fails due to some internal error.
+     */
+    public boolean getNoLocal() throws JMSException
+    {
+        checkNotClosed();
+        return _noLocal;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TopicSubscriberImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAConnectionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAConnectionImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAConnectionImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAConnectionImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,71 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import org.apache.qpidity.QpidException;
+
+import javax.jms.XAConnection;
+import javax.jms.JMSException;
+import javax.jms.XASession;
+
+/**
+ * This class implements the javax.njms.XAConnection interface
+ */
+public class XAConnectionImpl extends ConnectionImpl implements XAConnection
+{
+    //-- constructor
+    /**
+     * Create a XAConnection.
+     *
+     * @param host        The broker host name.
+     * @param port        The port on which the broker is listening for connection.
+     * @param virtualHost The virtual host on which the broker is deployed.
+     * @param username    The user name used of user identification.
+     * @param password    The password name used of user identification.
+     * @throws QpidException If creating a connection fails due to some internal error.
+     */    
+    protected XAConnectionImpl(String host, int port, String virtualHost, String username, String password) throws QpidException
+    {
+        super(host, port, virtualHost, username, password);
+    }
+
+    //-- interface XAConnection
+    /**
+     * Creates an XASession.
+     *
+     * @return A newly created XASession.
+     * @throws JMSException If the XAConnectiono fails to create an XASession due to
+     *                      some internal error.
+     */
+    public synchronized XASession createXASession() throws JMSException
+    {
+        checkNotClosed();
+        XASessionImpl xasession;
+        try
+        {
+            xasession = new XASessionImpl(this);
+        }
+        catch (QpidException e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+        // add this session with the list of session that are handled by this connection
+        _sessions.add(xasession);
+        return xasession;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAConnectionImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAQueueConnectionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAQueueConnectionImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAQueueConnectionImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAQueueConnectionImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,72 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import org.apache.qpidity.QpidException;
+
+import javax.jms.XAQueueConnection;
+import javax.jms.JMSException;
+import javax.jms.XAQueueSession;
+
+/**
+ * Implements  XAQueueConnection
+ */
+public class XAQueueConnectionImpl extends XAConnectionImpl implements XAQueueConnection
+{
+    //-- constructor
+    /**
+     * Create a XAQueueConnection.
+     *
+     * @param host        The broker host name.
+     * @param port        The port on which the broker is listening for connection.
+     * @param virtualHost The virtual host on which the broker is deployed.
+     * @param username    The user name used of user identification.
+     * @param password    The password name used of user identification.
+     * @throws QpidException If creating a XAQueueConnection fails due to some internal error.
+     */
+    public XAQueueConnectionImpl(String host, int port, String virtualHost, String username, String password)
+            throws QpidException
+    {
+        super(host, port, virtualHost, username, password);
+    }
+
+    //-- Interface  XAQueueConnection
+    /**
+     * Creates an XAQueueSession.
+     *
+     * @return A newly created XASession.
+     * @throws JMSException If the XAQueueConnectionImpl fails to create an XASession due to
+     *                      some internal error.
+     */
+    public synchronized XAQueueSession createXAQueueSession() throws JMSException
+    {
+        checkNotClosed();
+        XAQueueSessionImpl xaQueueSession;
+        try
+        {
+            xaQueueSession = new XAQueueSessionImpl(this);
+        }
+        catch (QpidException e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+        // add this session with the list of session that are handled by this connection
+        _sessions.add(xaQueueSession);
+        return xaQueueSession;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAQueueConnectionImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native