You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/08/06 15:54:11 UTC

svn commit: r563134 [2/5] - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity: ./ impl/ jms/ jms/message/

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java?view=auto&rev=563134
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java Mon Aug  6 06:54:05 2007
@@ -0,0 +1,182 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.jms;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.qpidity.QpidException;
+
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+
+/**
+ * MessageActor is the superclass for MessageProducerImpl and MessageProducerImpl.
+ */
+public abstract class MessageActor
+{
+    /**
+     * Used for debugging.
+     */
+    protected static final Logger _logger = LoggerFactory.getLogger(MessageActor.class);
+
+    /**
+     * Indicates whether this MessageActor is closed.
+     */
+    protected boolean _isClosed = false;
+
+    /**
+     * This messageActor's session
+     */
+    private SessionImpl _session;
+
+    /**
+     * The JMS destination this actor is set for.
+     */
+    DestinationImpl _destination;
+
+    /**
+     * Indicates that this actor is stopped
+     */
+    protected boolean _isStopped;
+
+    /**
+     * The ID of this actor for the session.
+     */
+    private String _messageActorID;
+
+    //-- Constructor
+
+    //TODO define the parameters
+
+    protected MessageActor()
+    {
+
+    }
+
+    protected MessageActor(SessionImpl session, DestinationImpl destination)
+    {
+        _session = session;
+        _destination = destination;
+    }
+
+    //--- public methods (part of the jms public API)
+    /**
+     * Closes the MessageActor and deregister it from its session.
+     *
+     * @throws JMSException if the MessaeActor cannot be closed due to some internal error.
+     */
+    public void close() throws JMSException
+    {
+        if (!_isClosed)
+        {
+            closeMessageActor();
+            // notify the session that this message actor is closing
+            _session.closeMessageActor(this);
+        }
+    }
+
+    //-- protected methods
+
+    /**
+     * Stop this message actor
+     *
+     * @throws Exception If the consumer cannot be stopped due to some internal error.
+     */
+    protected void stop() throws Exception
+    {
+        _isStopped = true;
+    }
+
+    /**
+     * Start this message Actor
+     *
+     * @throws Exception If the consumer cannot be started due to some internal error.
+     */
+    protected void start() throws Exception
+    {
+
+        _isStopped = false;
+
+    }
+
+    /**
+     * Check if this MessageActor is not closed.
+     * <p> If the MessageActor is closed, throw a javax.jms.IllegalStateException.
+     * <p> The method is not synchronized, since MessageProducers can only be used by a single thread.
+     *
+     * @throws IllegalStateException if the MessageActor is closed
+     */
+    protected void checkNotClosed() throws IllegalStateException
+    {
+        if (_isClosed || _session == null)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Actor " + this + " is already closed");
+            }
+            throw new IllegalStateException("Actor " + this + " is already closed");
+        }
+        _session.checkNotClosed();
+    }
+
+    /**
+     * Closes a MessageActor.
+     * <p> This method is invoked when the session is closing or when this
+     * messageActor is closing.
+     *
+     * @throws JMSException If the MessaeActor cannot be closed due to some internal error.
+     */
+    protected void closeMessageActor() throws JMSException
+    {
+        if (!_isClosed)
+        {
+            try
+            {
+                // cancle this destination 
+                getSession().getQpidSession().messageCancel(getMessageActorID());
+            }
+            catch (QpidException e)
+            {
+                throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+            }
+            _isClosed = true;
+        }
+    }
+
+    /**
+     * Get the associated session object.
+     *
+     * @return This Actor's Session.
+     */
+    protected SessionImpl getSession()
+    {
+        return _session;
+    }
+
+    /**
+     * Get the ID of this actor within its session.
+     *
+     * @return This actor ID.
+     */
+    protected String getMessageActorID()
+    {
+        return _messageActorID;
+    }
+
+
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java?view=auto&rev=563134
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java Mon Aug  6 06:54:05 2007
@@ -0,0 +1,613 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.jms;
+
+import org.apache.qpidity.jms.message.QpidMessage;
+import org.apache.qpidity.impl.MessagePartListenerAdapter;
+import org.apache.qpidity.MessagePartListener;
+import org.apache.qpidity.Range;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.Option;
+import org.apache.qpidity.filter.MessageFilter;
+import org.apache.qpidity.filter.JMSSelectorFilter;
+import org.apache.qpidity.exchange.ExchangeDefaults;
+
+import javax.jms.*;
+
+/**
+ * Implementation of JMS message consumer
+ */
+public class MessageConsumerImpl extends MessageActor implements MessageConsumer
+{
+    // we can receive up to 100 messages for an asynchronous listener
+    public static final int MAX_MESSAGE_TRANSFERRED = 100;
+
+    /**
+     * This MessageConsumer's messageselector.
+     */
+    private String _messageSelector = null;
+
+    /**
+     * The message selector filter associated with this consumer message selector
+     */
+    private MessageFilter _filter = null;
+
+    /**
+     * NoLocal
+     * If true, and the destination is a topic then inhibits the delivery of messages published
+     * by its own connection.  The behavior for NoLocal is not specified if the destination is a queue.
+     */
+    protected boolean _noLocal;
+
+    /**
+     * The subscription name
+     */
+    protected String _subscriptionName;
+
+    /**
+     * Indicates whether this consumer receives pre-acquired messages
+     */
+    private boolean _preAcquire = true;
+
+    /**
+     * A MessagePartListener set up for this consumer.
+     */
+    private MessageListener _messageListener;
+
+    /**
+     * The synchronous message just delivered
+     */
+    private QpidMessage _incomingMessage;
+
+    /**
+     * A lcok on the syncrhonous message
+     */
+    private final Object _incomingMessageLock = new Object();
+
+    /**
+     * Indicates that this consumer is receiving a synch message
+     */
+    private boolean _isReceiving = false;
+
+    /**
+     * Indicates that a nowait is receiving a message.
+     */
+    private boolean _isNoWaitIsReceiving = false;
+
+    /**
+     * Number of mesages received asynchronously
+     * Nether exceed MAX_MESSAGE_TRANSFERRED
+     */
+    private int _messageAsyncrhonouslyReceived = 0;
+
+    //----- Constructors
+    /**
+     * Create a new MessageProducerImpl.
+     *
+     * @param session          The session from which the MessageProducerImpl is instantiated
+     * @param destination      The default destination for this MessageProducerImpl
+     * @param messageSelector  The message selector for this QueueReceiverImpl.
+     * @param noLocal          If true inhibits the delivery of messages published by its own connection.
+     * @param subscriptionName Name of the subscription if this is to be created as a durable subscriber.
+     *                         If this value is null, a non-durable subscription is created.
+     * @throws Exception If the MessageProducerImpl cannot be created due to some internal error.
+     */
+    protected MessageConsumerImpl(SessionImpl session, DestinationImpl destination, String messageSelector,
+                                  boolean noLocal, String subscriptionName) throws Exception
+    {
+        super(session, destination);
+        if (messageSelector != null)
+        {
+            _messageSelector = messageSelector;
+            _filter = new JMSSelectorFilter(messageSelector);
+        }
+        _noLocal = noLocal;
+        _subscriptionName = subscriptionName;
+        _isStopped = getSession().isStopped();
+        // let's create a message part assembler
+        /**
+         * A Qpid message listener that pushes messages to this consumer session when this consumer is
+         * asynchronous or directly to this consumer when it is synchronously accessed.
+         */
+        MessagePartListener messageAssembler = new MessagePartListenerAdapter(new QpidMessageListener(this));
+
+        if (destination instanceof Queue)
+        {
+            // this is a queue we expect that this queue exists
+            getSession().getQpidSession()
+                    .messageSubscribe(destination.getName(), getMessageActorID(),
+                                      org.apache.qpidity.Session.CONFIRM_MODE_NOT_REQUIRED,
+                                      // When the message selctor is set we do not acquire the messages
+                                      _messageSelector != null ? org.apache.qpidity.Session.ACQUIRE_MODE_NO_ACQUIRE : org.apache.qpidity.Session.ACQUIRE_MODE_PRE_ACQUIRE,
+                                      messageAssembler, null, _noLocal ? Option.NO_LOCAL : Option.NO_OPTION);
+            if (_messageSelector != null)
+            {
+                _preAcquire = false;
+            }
+        }
+        else
+        {
+            // this is a topic we need to create a temporary queue for this consumer
+            // unless this is a durable subscriber
+            String queueName;
+            if (subscriptionName != null)
+            {
+                // this ia a durable subscriber
+                // create a persistent queue for this subscriber
+                queueName = "topic-" + subscriptionName;
+                getSession().getQpidSession()
+                        .queueDeclare(queueName, null, null, Option.EXCLUSIVE, Option.DURABLE);
+            }
+            else
+            {
+                // this is a non durable subscriber
+                // create a temporary queue
+                queueName = "topic-" + getMessageActorID();
+                getSession().getQpidSession()
+                        .queueDeclare(queueName, null, null, Option.AUTO_DELETE, Option.EXCLUSIVE);
+            }
+            // bind this queue with the topic exchange
+            getSession().getQpidSession()
+                    .queueBind(queueName, ExchangeDefaults.TOPIC_EXCHANGE_NAME, destination.getName(), null);
+            // subscribe to this topic 
+            getSession().getQpidSession()
+                    .messageSubscribe(queueName, getMessageActorID(),
+                                      org.apache.qpidity.Session.CONFIRM_MODE_NOT_REQUIRED,
+                                      // We always acquire the messages
+                                      org.apache.qpidity.Session.ACQUIRE_MODE_PRE_ACQUIRE, messageAssembler, null,
+                                      _noLocal ? Option.NO_LOCAL : Option.NO_OPTION,
+                                      // Request exclusive subscription access, meaning only this subscription
+                                      // can access the queue.
+                                      Option.EXCLUSIVE);
+
+        }
+        // set the flow mode
+        getSession().getQpidSession()
+                .messageFlowMode(getMessageActorID(), org.apache.qpidity.Session.MESSAGE_FLOW_MODE_CREDIT);
+    }
+
+    //----- Message consumer API
+    /**
+     * Gets this  MessageConsumer's message selector.
+     *
+     * @return This MessageConsumer's message selector, or null if no
+     *         message selector exists for the message consumer (that is, if
+     *         the message selector was not set or was set to null or the
+     *         empty string)
+     * @throws JMSException if getting the message selector fails due to some internal error.
+     */
+    public String getMessageSelector() throws JMSException
+    {
+        checkNotClosed();
+        return _messageSelector;
+    }
+
+    /**
+     * Gets this MessageConsumer's <CODE>MessagePartListener</CODE>.
+     *
+     * @return The listener for the MessageConsumer, or null if no listener is set
+     * @throws JMSException if getting the message listener fails due to some internal error.
+     */
+    public MessageListener getMessageListener() throws JMSException
+    {
+        checkNotClosed();
+        return _messageListener;
+    }
+
+    /**
+     * Sets the MessageConsumer's <CODE>MessagePartListener</CODE>.
+     * <p> The JMS specification says:
+     * <P>Setting the message listener to null is the equivalent of
+     * unsetting the message listener for the message consumer.
+     * <P>The effect of calling <CODE>MessageConsumer.setMessageListener</CODE>
+     * while messages are being consumed by an existing listener
+     * or the consumer is being used to consume messages synchronously
+     * is undefined.
+     *
+     * @param messageListener The listener to which the messages are to be delivered
+     * @throws JMSException If setting the message listener fails due to some internal error.
+     */
+    public synchronized void setMessageListener(MessageListener messageListener) throws JMSException
+    {
+        // this method is synchronized as onMessage also access _messagelistener
+        // onMessage, getMessageListener and this method are the only synchronized methods
+        checkNotClosed();
+        try
+        {
+            _messageListener = messageListener;
+            if (messageListener != null)
+            {
+                resetAsynchMessageReceived();
+            }
+        }
+        catch (Exception e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+    }
+
+    /**
+     * Contact the broker and ask for the delivery of MAX_MESSAGE_TRANSFERRED messages
+     *
+     * @throws QpidException If there is a communication error
+     */
+    private void resetAsynchMessageReceived() throws QpidException
+    {
+        if (!_isStopped && _messageAsyncrhonouslyReceived >= MAX_MESSAGE_TRANSFERRED)
+        {
+            getSession().getQpidSession().messageStop(getMessageActorID());
+        }
+        _messageAsyncrhonouslyReceived = 0;
+        getSession().getQpidSession()
+                .messageFlow(getMessageActorID(), org.apache.qpidity.Session.MESSAGE_FLOW_UNIT_MESSAGE,
+                             MAX_MESSAGE_TRANSFERRED);
+    }
+
+    /**
+     * Receive the next message produced for this message consumer.
+     * <P>This call blocks indefinitely until a message is produced or until this message consumer is closed.
+     *
+     * @return The next message produced for this message consumer, or
+     *         null if this message consumer is concurrently closed
+     * @throws JMSException If receiving the next message fails due to some internal error.
+     */
+    public Message receive() throws JMSException
+    {
+        return receive(0);
+    }
+
+    /**
+     * Receive the next message that arrives within the specified timeout interval.
+     * <p> This call blocks until a message arrives, the timeout expires, or this message consumer
+     * is closed.
+     * <p> A timeout of zero never expires, and the call blocks indefinitely.
+     * <p> A timeout less than 0 throws a JMSException.
+     *
+     * @param timeout The timeout value (in milliseconds)
+     * @return The next message that arrives within the specified timeout interval.
+     * @throws JMSException If receiving the next message fails due to some internal error.
+     */
+    public Message receive(long timeout) throws JMSException
+    {
+        if (timeout < 0)
+        {
+            throw new JMSException("Invalid timeout value: " + timeout);
+        }
+        Message result;
+        try
+        {
+            result = internalReceive(timeout);
+        }
+        catch (Exception e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+        return result;
+    }
+
+    /**
+     * Receive the next message if one is immediately available.
+     *
+     * @return the next message or null if one is not available.
+     * @throws JMSException If receiving the next message fails due to some internal error.
+     */
+    public Message receiveNoWait() throws JMSException
+    {
+        Message result;
+        try
+        {
+            result = internalReceive(-1);
+        }
+        catch (Exception e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+        return result;
+    }
+
+    // not public methods
+
+    /**
+     * Receive a synchronous message
+     * <p> This call blocks until a message arrives, the timeout expires, or this message consumer
+     * is closed.
+     * <p> A timeout of zero never expires, and the call blocks indefinitely (unless this message consumer
+     * is closed)
+     * <p> A timeout less than 0 returns the next message or null if one is not available.
+     *
+     * @param timeout The timeout value (in milliseconds)
+     * @return the next message or null if one is not available.
+     * @throws Exception If receiving the next message fails due to some internal error.
+     */
+    private Message internalReceive(long timeout) throws Exception
+    {
+        checkNotClosed();
+        if (_messageListener != null)
+        {
+            throw new javax.jms.IllegalStateException("A listener has already been set.");
+        }
+
+        Message result = null;
+        synchronized (_incomingMessageLock)
+        {
+            // This indicate to the delivery thread to deliver the message to this consumer
+            // as it can happens that a message is delivered after a receive operation as returned.
+            _isReceiving = true;
+            boolean received = false;
+            if (!_isStopped)
+            {
+                // if this consumer is stopped then this will be call when starting
+                getSession().getQpidSession()
+                        .messageFlow(getMessageActorID(), org.apache.qpidity.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
+                received = getSession().getQpidSession().messageFlush(getMessageActorID());
+            }
+            if (!received && timeout < 0)
+            {
+                // this is a nowait and we havent received a message then we must immediatly return
+                result = null;
+            }
+            else
+            {
+                // right we need to let onMessage know that a nowait is potentially waiting for a message
+                if (timeout < 0)
+                {
+                    _isNoWaitIsReceiving = true;
+                }
+                while (_incomingMessage == null && !_isClosed)
+                {
+                    try
+                    {
+                        _incomingMessageLock.wait(timeout);
+                    }
+                    catch (InterruptedException e)
+                    {
+                        // do nothing
+                    }
+                }
+                if (_incomingMessage != null)
+                {
+                    result = _incomingMessage.getJMSMessage();
+                    // tell the session that a message is inprocess
+                    getSession().preProcessMessage(_incomingMessage);
+                    // tell the session to acknowledge this message (if required)
+                    getSession().acknowledgeMessage(_incomingMessage);
+                }
+                _incomingMessage = null;
+            }
+            // We now release any message received for this consumer
+            _isReceiving = false;
+            _isNoWaitIsReceiving = false;
+        }
+        return result;
+    }
+
+    /**
+     * Stop the delivery of messages to this consumer.
+     * <p>For asynchronous receiver, this operation blocks until the message listener
+     * finishes processing the current message,
+     *
+     * @throws Exception If the consumer cannot be stopped due to some internal error.
+     */
+    protected void stop() throws Exception
+    {
+        getSession().getQpidSession().messageStop(getMessageActorID());
+        _isStopped = true;
+    }
+
+    /**
+     * Start the delivery of messages to this consumer.
+     *
+     * @throws Exception If the consumer cannot be started due to some internal error.
+     */
+    protected void start() throws Exception
+    {
+        synchronized (_incomingMessageLock)
+        {
+            _isStopped = false;
+            if (_isReceiving)
+            {
+                // there is a synch call waiting for a message to be delivered
+                // so tell the broker to deliver a message
+                getSession().getQpidSession()
+                        .messageFlow(getMessageActorID(), org.apache.qpidity.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
+                getSession().getQpidSession().messageFlush(getMessageActorID());
+            }
+        }
+    }
+
+    /**
+     * Deliver a message to this consumer.
+     *
+     * @param message The message delivered to this consumer.
+     */
+    protected synchronized void onMessage(QpidMessage message)
+    {
+        try
+        {
+            // if there is a message selector then we need to evaluate it.
+            boolean messageOk = true;
+            if (_messageSelector != null)
+            {
+                messageOk = _filter.matches(message.getJMSMessage());
+            }
+            if (!messageOk && _preAcquire)
+            {
+                // this is the case for topics
+                // We need to ack this message
+                acknowledgeMessage(message);
+            }
+            // now we need to acquire this message if needed
+            // this is the case of queue with a message selector set
+            if (!_preAcquire && messageOk)
+            {
+                messageOk = acquireMessage(message);
+            }
+
+            // if this consumer is synchronous then set the current message and
+            // notify the waiting thread
+            if (_messageListener == null)
+            {
+                synchronized (_incomingMessageLock)
+                {
+                    if (messageOk)
+                    {
+                        // we have received a proper message that we can deliver
+                        if (_isReceiving)
+                        {
+                            _incomingMessage = message;
+                            _incomingMessageLock.notify();
+                        }
+                        else
+                        {
+                            // this message has been received after a received as returned
+                            // we need to release it
+                            releaseMessage(message);
+                        }
+                    }
+                    else
+                    {
+                        // oups the message did not match the selector or we did not manage to acquire it
+                        // If the receiver is still waiting for a message
+                        // then we need to request a new one from the server
+                        if (_isReceiving)
+                        {
+                            getSession().getQpidSession()
+                                    .messageFlow(getMessageActorID(),
+                                                 org.apache.qpidity.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
+                            boolean received = getSession().getQpidSession().messageFlush(getMessageActorID());
+                            if (!received && _isNoWaitIsReceiving)
+                            {
+                                // Right a message nowait is waiting for a message
+                                // but no one can be delivered it then need to return
+                                _incomingMessageLock.notify();
+                            }
+                        }
+                    }
+                }
+            }
+            else
+            {
+                _messageAsyncrhonouslyReceived++;
+                if (_messageAsyncrhonouslyReceived >= MAX_MESSAGE_TRANSFERRED)
+                {
+                    // ask the server for the delivery of MAX_MESSAGE_TRANSFERRED more messages
+                    resetAsynchMessageReceived();
+                }
+                // only deliver the message if it is valid 
+                if (messageOk)
+                {
+                    // This is an asynchronous message
+                    // tell the session that a message is in process
+                    getSession().preProcessMessage(message);
+                    // If the session is transacted we need to ack the message first
+                    // This is because a message is associated with its tx only when acked
+                    if (getSession().getTransacted())
+                    {
+                        getSession().acknowledgeMessage(message);
+                    }
+                    // The JMS specs says:
+                    /* The result of a listener throwing a RuntimeException depends on the session?s
+                    * acknowledgment mode.
+                    ? --- AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE - the message
+                    * will be immediately redelivered. The number of times a JMS provider will
+                    * redeliver the same message before giving up is provider-dependent.
+                    ? --- CLIENT_ACKNOWLEDGE - the next message for the listener is delivered.
+                    * --- Transacted Session - the next message for the listener is delivered.
+                    *
+                    * The number of time we try redelivering the message is 0
+                    **/
+                    try
+                    {
+                        _messageListener.onMessage(message.getJMSMessage());
+                    }
+                    catch (RuntimeException re)
+                    {
+                        // do nothing as this message will not be redelivered
+                    }
+                    // If the session has been recovered we then need to redelivered this message
+                    if (getSession().isInRecovery())
+                    {
+                        releaseMessage(message);
+                    }
+                    else if (!getSession().getTransacted())
+                    {
+                        // Tell the jms Session to ack this message if required
+                        getSession().acknowledgeMessage(message);
+                    }
+                }
+            }
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e.getMessage());
+        }
+    }
+
+    /**
+     * Release a message
+     *
+     * @param message The message to be released
+     * @throws QpidException If the message cannot be released due to some internal error.
+     */
+    private void releaseMessage(QpidMessage message) throws QpidException
+    {
+        if (_preAcquire)
+        {
+            Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID());
+            getSession().getQpidSession().messageRelease(range);
+        }
+    }
+
+    /**
+     * Acquire a message
+     *
+     * @param message The message to be acquired
+     * @return true if the message has been acquired, false otherwise.
+     * @throws QpidException If the message cannot be acquired due to some internal error.
+     */
+    private boolean acquireMessage(QpidMessage message) throws QpidException
+    {
+        boolean result = false;
+        if (!_preAcquire)
+        {
+            Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID());
+
+            Range<Long>[] rangeResult = getSession().getQpidSession().messageAcquire(range);
+            if (rangeResult.length > 0)
+            {
+                result = rangeResult[0].getLower().compareTo(message.getMessageID()) == 0;
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Acknowledge a message
+     *
+     * @param message The message to be acknowledged
+     * @throws QpidException If the message cannot be acquired due to some internal error.
+     */
+    private void acknowledgeMessage(QpidMessage message) throws QpidException
+    {
+        if (!_preAcquire)
+        {
+            Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID());
+            getSession().getQpidSession().messageAcknowledge(range);
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java?view=auto&rev=563134
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java Mon Aug  6 06:54:05 2007
@@ -0,0 +1,316 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.jms;
+
+import javax.jms.*;
+
+/**
+ * Implements  MessageProducer
+ */
+public class MessageProducerImpl extends MessageActor implements MessageProducer
+{
+    /**
+     * If true, messages will not get a timestamp.
+     */
+    private boolean _disableTimestamps = false;
+
+    /**
+     * Priority of messages created by this producer.
+     */
+    private int _messagePriority = Message.DEFAULT_PRIORITY;
+
+    /**
+     * Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
+     */
+    private long _timeToLive;
+
+    /**
+     * Delivery mode used for this producer.
+     */
+    private int _deliveryMode = DeliveryMode.PERSISTENT;
+
+    /**
+     * Speicify whether the messageID is disable
+     */
+    private boolean _disableMessageId = false;
+
+    //-- constructors
+    public MessageProducerImpl(SessionImpl session, DestinationImpl destination)
+    {
+        super(session, destination);
+    }
+
+    //--- Interface javax.jms.MessageProducer
+    /**
+     * Sets whether message IDs are disabled.
+     *
+     * @param value Specify whether the MessageID must be disabled
+     * @throws JMSException If disabling messageID fails due to some internal error.
+     */
+    public void setDisableMessageID(boolean value) throws JMSException
+    {
+        checkNotClosed();
+        _disableMessageId = value;
+    }
+
+    /**
+     * Gets an indication of whether message IDs are disabled.
+     *
+     * @return true is messageID is disabled, false otherwise
+     * @throws JMSException If getting whether messagID is disabled fails due to some internal error.
+     */
+    public boolean getDisableMessageID() throws JMSException
+    {
+        checkNotClosed();
+        return _disableMessageId;
+    }
+
+    /**
+     * Sets whether message timestamps are disabled.
+     * <P> JMS spec says:
+     * <p> Since timestamps take some effort to create and increase a
+     * message's size, some JMS providers may be able to optimize message
+     * overhead if they are given a hint that the timestamp is not used by an
+     * application....
+     * these messages must have the timestamp set to zero; if the provider
+     * ignores the hint, the timestamp must be set to its normal value.
+     * <p>Message timestamps are enabled by default.
+     *
+     * @param value Indicates if message timestamps are disabled
+     * @throws JMSException if disabling the timestamps fails due to some internal error.
+     */
+    public void setDisableMessageTimestamp(boolean value) throws JMSException
+    {
+        checkNotClosed();
+        _disableTimestamps = value;
+    }
+
+    /**
+     * Gets an indication of whether message timestamps are disabled.
+     *
+     * @return an indication of whether message timestamps are disabled
+     * @throws JMSException if getting whether timestamps are disabled fails due to some internal error.
+     */
+    public boolean getDisableMessageTimestamp() throws JMSException
+    {
+        checkNotClosed();
+        return _disableTimestamps;
+    }
+
+    /**
+     * Sets the producer's default delivery mode.
+     * <p> JMS specification says:
+     * <p>Delivery mode is set to {@link DeliveryMode#PERSISTENT} by default.
+     *
+     * @param deliveryMode The message delivery mode for this message producer; legal
+     *                     values are {@link DeliveryMode#NON_PERSISTENT}
+     *                     and {@link DeliveryMode#PERSISTENT}.
+     * @throws JMSException if setting the delivery mode fails due to some internal error.
+     */
+    public void setDeliveryMode(int deliveryMode) throws JMSException
+    {
+        checkNotClosed();
+        if ((deliveryMode != DeliveryMode.NON_PERSISTENT) && (deliveryMode != DeliveryMode.PERSISTENT))
+        {
+            throw new JMSException(
+                    "DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + deliveryMode + " is illegal");
+        }
+        _deliveryMode = deliveryMode;
+    }
+
+    /**
+     * Gets the producer's delivery mode.
+     *
+     * @return The message delivery mode for this message producer
+     * @throws JMSException If getting the delivery mode fails due to some internal error.
+     */
+    public int getDeliveryMode() throws JMSException
+    {
+        checkNotClosed();
+        return _deliveryMode;
+    }
+
+    /**
+     * Sets the producer's message priority.
+     * <p> The jms spec says:
+     * <p> The JMS API defines ten levels of priority value, with 0 as the
+     * lowest priority and 9 as the highest. Clients should consider priorities
+     * 0-4 as gradations of normal priority and priorities 5-9 as gradations
+     * of expedited priority.
+     * <p> Priority is set to 4 by default.
+     *
+     * @param priority The message priority for this message producer; must be a value between 0 and 9
+     * @throws JMSException if setting this producer priority fails due to some internal error.
+     */
+    public void setPriority(int priority) throws JMSException
+    {
+        checkNotClosed();
+        if ((priority < 0) || (priority > 9))
+        {
+            throw new IllegalArgumentException(
+                    "Priority of " + priority + " is illegal. Value must be in range 0 to 9");
+        }
+        _messagePriority = priority;
+    }
+
+    /**
+     * Gets the producer's message priority.
+     *
+     * @return The message priority for this message producer.
+     * @throws JMSException If getting this producer message priority fails due to some internal error.
+     */
+    public int getPriority() throws JMSException
+    {
+        checkNotClosed();
+        return _messagePriority;
+    }
+
+    /**
+     * Sets the default length of time in milliseconds from its dispatch time
+     * that a produced message should be retained by the message system.
+     * <p> The JMS spec says that time to live must be set to zero by default.
+     *
+     * @param timeToLive The message time to live in milliseconds; zero is unlimited
+     * @throws JMSException If setting the default time to live fails due to some internal error.
+     */
+    public void setTimeToLive(long timeToLive) throws JMSException
+    {
+        checkNotClosed();
+        if (timeToLive < 0)
+        {
+            throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + timeToLive);
+        }
+        _timeToLive = timeToLive;
+    }
+
+    /**
+     * Gets the default length of time in milliseconds from its dispatch time
+     * that a produced message should be retained by the message system.
+     *
+     * @return The default message time to live in milliseconds; zero is unlimited
+     * @throws JMSException if getting the default time to live fails due to some internal error.
+     * @see javax.jms.MessageProducer#setTimeToLive
+     */
+    public long getTimeToLive() throws JMSException
+    {
+        checkNotClosed();
+        return _timeToLive;
+    }
+
+    /**
+     * Gets the destination associated with this producer.
+     *
+     * @return This producer's destination.
+     * @throws JMSException If getting the destination for this producer fails
+     *                      due to some internal error.
+     */
+    public Destination getDestination() throws JMSException
+    {
+        checkNotClosed();
+        return _destination;
+    }
+
+    /**
+     * Sends a message using the producer's default delivery mode, priority, destination
+     * and time to live.
+     *
+     * @param message the message to be sent
+     * @throws JMSException                If sending the message fails due to some internal error.
+     * @throws MessageFormatException      If an invalid message is specified.
+     * @throws InvalidDestinationException If this producer destination is invalid.
+     * @throws java.lang.UnsupportedOperationException
+     *                                     If a client uses this method with a producer that did
+     *                                     not specify a destination at creation time.
+     */
+    public void send(Message message) throws JMSException
+    {
+        send(message, _deliveryMode, _messagePriority, _timeToLive);
+    }
+
+    /**
+     * Sends a message to this producer default destination, specifying delivery mode,
+     * priority, and time to live.
+     *
+     * @param message      The message to send
+     * @param deliveryMode The delivery mode to use
+     * @param priority     The priority for this message
+     * @param timeToLive   The message's lifetime (in milliseconds)
+     * @throws JMSException                If sending the message fails due to some internal error.
+     * @throws MessageFormatException      If an invalid message is specified.
+     * @throws InvalidDestinationException If this producer's destination is invalid.
+     * @throws java.lang.UnsupportedOperationException
+     *                                     If a client uses this method with a producer that did
+     *                                     not specify a destination at creation time.
+     */
+    public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
+    {
+        send(_destination, message, deliveryMode, priority, timeToLive);
+    }
+
+    /**
+     * Sends a message to a specified destination using this producer's default
+     * delivery mode, priority and time to live.
+     * <p/>
+     * <P>Typically, a message producer is assigned a destination at creation
+     * time; however, the JMS API also supports unidentified message producers,
+     * which require that the destination be supplied every time a message is
+     * sent.
+     *
+     * @param destination The destination to send this message to
+     * @param message     The message to send
+     * @throws JMSException                If sending the message fails due to some internal error.
+     * @throws MessageFormatException      If an invalid message is specified.
+     * @throws InvalidDestinationException If an invalid destination is specified.
+     */
+    public void send(Destination destination, Message message) throws JMSException
+    {
+        send(destination, message, _deliveryMode, _messagePriority, _timeToLive);
+    }
+
+    /**
+     * Sends a message to a destination specifying delivery mode, priority and time to live.
+     *
+     * @param destination  The destination to send this message to.
+     * @param message      The message to be sent.
+     * @param deliveryMode The delivery mode to use.
+     * @param priority     The priority for this message.
+     * @param timeToLive   The message's lifetime (in milliseconds)
+     * @throws JMSException                If sending the message fails due to some internal error.
+     * @throws MessageFormatException      If an invalid message is specified.
+     * @throws InvalidDestinationException If an invalid destination is specified.
+     */
+    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
+            throws JMSException
+    {
+        checkNotClosed();
+        getSession().checkDestination(destination);
+        // Do not allow negative timeToLive values
+        if (timeToLive < 0)
+        {
+            throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + timeToLive);
+        }
+        // check that the message is not a foreign one
+
+        // set the properties
+
+        //
+
+        // dispatch it
+        // todo getSession().getQpidSession().messageTransfer(((DestinationImpl) destination).getExchangeName(), message, Option);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidExceptionListenerImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidExceptionListenerImpl.java?view=auto&rev=563134
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidExceptionListenerImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidExceptionListenerImpl.java Mon Aug  6 06:54:05 2007
@@ -0,0 +1,51 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.jms;
+
+import org.apache.qpidity.QpidException;
+
+import javax.jms.JMSException;
+
+/**
+ * An exception listner 
+ */
+public class QpidExceptionListenerImpl //implements ExceptionListener
+{
+    private javax.jms.ExceptionListener _jmsExceptionListener;
+
+    public QpidExceptionListenerImpl()
+    {
+    }
+
+    void setJMSExceptionListner(javax.jms.ExceptionListener jmsExceptionListener)
+    {
+        _jmsExceptionListener = jmsExceptionListener;
+    }
+    //----- ExceptionListener API
+
+    public void onException(QpidException exception)
+    {
+        // convert this exception in a JMS exception
+        JMSException jmsException = ExceptionHelper.convertQpidExceptionToJMSException(exception);
+        // propagate to the jms exception listener
+        if (_jmsExceptionListener != null)
+        {
+            _jmsExceptionListener.onException(jmsException);
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidExceptionListenerImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java?view=auto&rev=563134
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java Mon Aug  6 06:54:05 2007
@@ -0,0 +1,82 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.jms;
+
+import org.apache.qpidity.MessageListener;
+import org.apache.qpidity.jms.message.QpidMessage;
+import org.apache.qpidity.api.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p> When asynchronous, upon receive of a message this listener delegate the dispatching to its session.
+ * This is for guarantying that asynch messages are sequentially processed within their session.
+ * <p> when used synchonously, messages are dispatched to the receiver itself.
+ */
+public class QpidMessageListener implements MessageListener
+{
+    /**
+     * Used for debugging.
+     */
+    private static final Logger _logger = LoggerFactory.getLogger(SessionImpl.class);
+
+    /**
+     * This message listener consumer
+     */
+    MessageConsumerImpl _consumer = null;
+
+    //---- constructor
+    /**
+     * Create a message listener wrapper for a given consumer
+     *
+     * @param consumer The consumer of this listener
+     */
+    public QpidMessageListener(MessageConsumerImpl consumer)
+    {
+        _consumer = consumer;
+    }
+
+    //---- org.apache.qpidity.MessagePartListener API
+    /**
+     * Deliver a message to the listener.
+     *
+     * @param message The message delivered to the listner.
+     */
+    public void onMessage(Message message)
+    {
+        try
+        {
+            //convert this message into a JMS one
+            QpidMessage jmsMessage = null; // todo
+            // if consumer is asynchronous then send this message to its session.
+            if( _consumer.getMessageListener() != null )
+            {
+                _consumer.getSession().dispatchMessage(_consumer.getMessageActorID(), jmsMessage);
+            }
+            else
+            {
+                // deliver this message to the consumer itself
+                _consumer.onMessage(jmsMessage);
+            }
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e.getMessage());
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java?view=auto&rev=563134
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java Mon Aug  6 06:54:05 2007
@@ -0,0 +1,86 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.jms;
+
+import javax.jms.QueueBrowser;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import java.util.Enumeration;
+
+/**
+ * Implementation of the JMS QueueBrowser interface
+ */
+public class QueueBrowserImpl extends MessageActor implements QueueBrowser
+{
+    /**
+     * The browsers MessageSelector.
+     */
+    private String _messageSelector = null;
+
+    //--- constructor
+
+    /**
+     * Create a QueueBrowser for a specific queue and a given message selector.
+     *
+     * @param session         The session of this browser.
+     * @param queue           The queue name for this browser
+     * @param messageSelector only messages with properties matching the message selector expression are delivered.
+     * @throws JMSException In case of internal problem when creating this browser.
+     */
+    protected QueueBrowserImpl(SessionImpl session, Queue queue, String messageSelector) throws JMSException
+    {
+        super(session, (DestinationImpl) queue);
+        _messageSelector = messageSelector;
+        //-- TODO: Create the QPid browser
+    }
+
+    //--- javax.jms.QueueBrowser API
+    /**
+     * Get an enumeration for browsing the current queue messages in the order they would be received.
+     *
+     * @return An enumeration for browsing the messages
+     * @throws JMSException If  getting the enumeration for this browser fails due to some internal error.
+     */
+    public Enumeration getEnumeration() throws JMSException
+    {
+        // TODO
+        return null;
+    }
+
+    /**
+     * Get the queue associated with this queue browser.
+     *
+     * @return The queue associated with this queue browser.
+     * @throws JMSException If getting the queue associated with this browser failts due to some internal error.
+     */
+    public Queue getQueue() throws JMSException
+    {
+        return (Queue) _destination;
+    }
+
+    /**
+     * Get this queue browser's message selector expression.
+     *
+     * @return This queue browser's message selector, or null if no message selector exists.
+     * @throws JMSException if getting the message selector for this browser fails due to some internal error.
+     */
+    public String getMessageSelector() throws JMSException
+    {
+        return _messageSelector;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java?view=auto&rev=563134
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java Mon Aug  6 06:54:05 2007
@@ -0,0 +1,75 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.jms;
+
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.Option;
+import org.apache.qpidity.url.BindingURL;
+import org.apache.qpidity.exchange.ExchangeDefaults;
+
+import javax.jms.Queue;
+import javax.jms.JMSException;
+
+/**
+ * Implementation of the JMS Queue interface
+ */
+public class QueueImpl extends DestinationImpl implements Queue
+{
+
+    //--- Constructor
+    /**
+     * Create a new QueueImpl with a given name.
+     *
+     * @param name    The name of this queue.
+     * @param session The session used to create this queue.
+     * @throws QpidException If the queue name is not valid
+     */
+    protected QueueImpl(SessionImpl session, String name) throws QpidException
+    {
+        super(session, name);
+        _exchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+        _exchangeClass = ExchangeDefaults.FANOUT_EXCHANGE_CLASS;
+        _queueName = name;
+        // check that this queue exist on the server
+        // As pasive is set the server will not create the queue.
+        session.getQpidSession().queueDeclare(name, null, null, Option.PASSIVE);
+    }
+
+    /**
+     * Create a destiantion from a binding URL
+     *
+     * @param session  The session used to create this queue.
+     * @param binding The URL
+     * @throws QpidException If the URL is not valid
+     */
+    protected QueueImpl(SessionImpl session, BindingURL binding) throws QpidException
+    {
+        super(session, binding);        
+    }
+
+    //---- Interface javax.jms.Queue
+    /**
+     * Gets the name of this queue.
+     *
+     * @return This queue's name.
+     */
+    public String getQueueName() throws JMSException
+    {
+        return super.getName();
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java?view=auto&rev=563134
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java Mon Aug  6 06:54:05 2007
@@ -0,0 +1,55 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.jms;
+
+import javax.jms.QueueReceiver;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+
+/**
+ * Implements javax.jms.QueueReceiver
+ */
+public class QueueReceiverImpl extends MessageConsumerImpl implements QueueReceiver
+{
+    //--- Constructor
+    /**
+     * create a new QueueReceiverImpl.
+     *
+     * @param session         The session from which the QueueReceiverImpl is instantiated.
+     * @param queue           The default queue for this QueueReceiverImpl.
+     * @param messageSelector the message selector for this QueueReceiverImpl.  
+     * @throws Exception If the QueueReceiverImpl cannot be created due to some internal error.
+     */
+    protected QueueReceiverImpl(SessionImpl session, Queue queue, String messageSelector) throws Exception
+    {
+        super(session, (DestinationImpl) queue, messageSelector, false, null);
+    }
+
+    //--- Interface  QueueReceiver
+    /**
+     * Get the Queue associated with this queue receiver.
+     *
+     * @return this receiver's Queue
+     * @throws JMSException If getting the queue for this queue receiver fails due to some internal error.
+     */
+    public Queue getQueue() throws JMSException
+    {
+        checkNotClosed();
+        return (QueueImpl) _destination;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueSenderImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueSenderImpl.java?view=auto&rev=563134
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueSenderImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueSenderImpl.java Mon Aug  6 06:54:05 2007
@@ -0,0 +1,131 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.jms;
+
+import javax.jms.QueueSender;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.Message;
+
+/**
+ * Implements javax.jms.QueueSender
+ */
+public class QueueSenderImpl extends MessageProducerImpl implements QueueSender
+{
+    //--- Constructor
+    /**
+     * Create a new QueueSenderImpl.
+     *
+     * @param session the session from which the QueueSenderImpl is instantiated
+     * @param queue   the default queue for this QueueSenderImpl
+     * @throws JMSException If the QueueSenderImpl cannot be created due to some internal error.
+     */
+    protected QueueSenderImpl(SessionImpl session, QueueImpl queue) throws JMSException
+    {
+        super(session, queue);
+    }
+
+    //--- Interface javax.jms.QueueSender
+    /**
+     * Get the queue associated with this QueueSender.
+     *
+     * @return This QueueSender's queue
+     * @throws JMSException If getting the queue for this QueueSender fails due to some internal error.
+     */
+    public Queue getQueue() throws JMSException
+    {
+        return (Queue) getDestination();
+    }
+
+    /**
+     * Sends a message to the queue. Uses the <CODE>QueueSender</CODE>'s default delivery mode, priority,
+     * and time to live.
+     *
+     * @param message The message to send.
+     * @throws JMSException if sending the message fails due to some internal error.
+     * @throws javax.jms.MessageFormatException
+     *                      If an invalid message is specified.
+     * @throws javax.jms.InvalidDestinationException
+     *                      If the queue is invalid.
+     * @throws java.lang.UnsupportedOperationException
+     *                      If invoked on QueueSender that did not specify a queue at creation time.
+     */
+    public void send(Message message) throws JMSException
+    {
+        super.send(message);
+    }
+
+    /**
+     * Send a message to the queue, specifying delivery mode, priority, and time to live.
+     *
+     * @param message      The message to send
+     * @param deliveryMode The delivery mode to use
+     * @param priority     The priority for this message
+     * @param timeToLive   The message's lifetime (in milliseconds)
+     *  
+     * @throws JMSException if sending the message fails due to some internal error.
+     * @throws javax.jms.MessageFormatException
+     *                      If an invalid message is specified.
+     * @throws javax.jms.InvalidDestinationException
+     *                      If the queue is invalid.
+     * @throws java.lang.UnsupportedOperationException
+     *                      If invoked on QueueSender that did not specify a queue at creation time.
+     */
+    public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
+    {
+        super.send(message, deliveryMode, priority, timeToLive);
+    }
+
+    /**
+     * Send a message to a queue for an unidentified message producer.
+     * Uses the <CODE>QueueSender</CODE>'s default delivery mode, priority,
+     * and time to live.
+     *
+     * @param queue   The queue to send this message to
+     * @param message The message to send
+     * @throws JMSException if sending the message fails due to some internal error.
+     * @throws javax.jms.MessageFormatException
+     *                      If an invalid message is specified.
+     * @throws javax.jms.InvalidDestinationException
+     *                      If the queue is invalid.
+     */
+    public void send(Queue queue, Message message) throws JMSException
+    {
+        super.send(queue, message);
+    }
+
+    /**
+     * Sends a message to a queue for an unidentified message producer,
+     * specifying delivery mode, priority and time to live.
+     *
+     * @param queue        The queue to send this message to
+     * @param message      The message to send
+     * @param deliveryMode The delivery mode to use
+     * @param priority     The priority for this message
+     * @param timeToLive   The message's lifetime (in milliseconds)
+     * @throws JMSException if sending the message fails due to some internal error.
+     * @throws javax.jms.MessageFormatException
+     *                      If an invalid message is specified.
+     * @throws javax.jms.InvalidDestinationException
+     *                      If the queue is invalid.
+     */
+    public void send(Queue queue, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
+    {
+        super.send(queue, message, deliveryMode, priority, timeToLive);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueSenderImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java?view=auto&rev=563134
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java Mon Aug  6 06:54:05 2007
@@ -0,0 +1,152 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.jms;
+
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+
+/**
+ * Implementation of javax.jms.QueueSession
+ */
+public class QueueSessionImpl extends SessionImpl implements QueueSession
+{
+    //--- constructor
+    /**
+     * Create a JMS Session
+     *
+     * @param connection      The ConnectionImpl object from which the Session is created.
+     * @param transacted      Indicates if the session transacted.
+     * @param acknowledgeMode The session's acknowledgement mode. This value is ignored and set to
+     *                        {@link javax.jms.Session#SESSION_TRANSACTED} if the <code>transacted</code>
+     *                        parameter is true.
+     * @throws javax.jms.JMSSecurityException If the user could not be authenticated.
+     * @throws javax.jms.JMSException         In case of internal error.
+     */
+    protected QueueSessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode) throws JMSException
+    {
+        super(connection, transacted, acknowledgeMode);
+    }
+
+    //-- Overwritten methods
+    /**
+     * Creates a durable subscriber to the specified topic,
+     *
+     * @param topic The non-temporary <CODE>Topic</CODE> to subscribe to.
+     * @param name  The name used to identify this subscription.
+     * @return Always throws an exception
+     * @throws IllegalStateException Always
+     */
+    @Override
+    public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
+    {
+        throw new IllegalStateException("Cannot invoke createDurableSubscriber from QueueSession");
+    }
+
+    /**
+     * Create a TemporaryTopic.
+     *
+     * @return Always throws an exception
+     * @throws IllegalStateException Always
+     */
+    @Override
+    public TemporaryTopic createTemporaryTopic() throws JMSException
+    {
+        throw new IllegalStateException("Cannot invoke createTemporaryTopic from QueueSession");
+    }
+
+    /**
+     * Creates a topic identity given a Topicname.
+     *
+     * @param topicName The name of this <CODE>Topic</CODE>
+     * @return Always throws an exception
+     * @throws IllegalStateException Always
+     */
+    @Override
+    public Topic createTopic(String topicName) throws JMSException
+    {
+        throw new IllegalStateException("Cannot invoke createTopic from QueueSession");
+    }
+
+    /**
+     * Unsubscribes a durable subscription that has been created by a client.
+     *
+     * @param name the name used to identify this subscription
+     * @throws IllegalStateException Always
+     */
+    @Override
+    public void unsubscribe(String name) throws JMSException
+    {
+        throw new IllegalStateException("Cannot invoke unsubscribe from QueueSession");
+    }
+
+    //--- Interface javax.jms.QueueSession
+    /**
+     * Create a QueueReceiver to receive messages from the specified queue.
+     *
+     * @param queue the <CODE>Queue</CODE> to access
+     * @return A QueueReceiver
+     * @throws JMSException                If creating a receiver fails due to some internal error.
+     * @throws InvalidDestinationException If an invalid queue is specified.
+     */
+    public QueueReceiver createReceiver(Queue queue) throws JMSException
+    {
+        return createReceiver(queue, null);
+    }
+
+    /**
+     * Create a QueueReceiver to receive messages from the specified queue for a given message selector.
+     *
+     * @param queue           the Queue to access
+     * @param messageSelector A value of null or an empty string indicates that
+     *                        there is no message selector for the message consumer.
+     * @return A QueueReceiver
+     * @throws JMSException                If creating a receiver fails due to some internal error.
+     * @throws InvalidDestinationException If an invalid queue is specified.
+     * @throws InvalidSelectorException    If the message selector is invalid.
+     */
+    public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
+    {
+        checkNotClosed();
+        checkDestination(queue);
+        QueueReceiver receiver;
+        try
+        {
+            receiver =  new QueueReceiverImpl(this, queue, messageSelector);
+        }
+        catch (Exception e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+        return receiver;
+    }
+
+    /**
+     * Create a QueueSender object to send messages to the specified queue.
+     *
+     * @param queue the Queue to access, or null if this is an unidentified producer
+     * @return A QueueSender
+     * @throws JMSException                If creating the sender fails due to some internal error.
+     * @throws InvalidDestinationException If an invalid queue is specified.
+     */
+    public QueueSender createSender(Queue queue) throws JMSException
+    {
+        checkNotClosed();
+        // we do not check the destination since unidentified producers are allowed (no default destination).
+        return new QueueSenderImpl(this, (QueueImpl) queue);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native