You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/09/19 13:36:26 UTC

svn commit: r577253 [3/7] - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity: nclient/ nclient/impl/ nclient/util/ njms/ njms/message/

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,662 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.nclient.MessagePartListener;
+import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
+import org.apache.qpidity.exchange.ExchangeDefaults;
+import org.apache.qpidity.filter.JMSSelectorFilter;
+import org.apache.qpidity.filter.MessageFilter;
+import org.apache.qpidity.njms.message.MessageFactory;
+import org.apache.qpidity.njms.message.QpidMessage;
+import org.apache.qpidity.transport.Option;
+import org.apache.qpidity.transport.RangeSet;
+
+/**
+ * Implementation of JMS message consumer
+ */
+public class MessageConsumerImpl extends MessageActor
+        implements MessageConsumer, org.apache.qpidity.nclient.util.MessageListener
+{
+    // we can receive up to 100 messages for an asynchronous listener
+    public static final int MAX_MESSAGE_TRANSFERRED = 100;
+
+    /**
+     * This MessageConsumer's messageselector.
+     */
+    private String _messageSelector = null;
+
+    /**
+     * The message selector filter associated with this consumer message selector
+     */
+    private MessageFilter _filter = null;
+
+    /**
+     * NoLocal
+     * If true, and the destination is a topic then inhibits the delivery of messages published
+     * by its own connection.  The behavior for NoLocal is not specified if the destination is a queue.
+     */
+    protected boolean _noLocal;
+
+    /**
+     * The subscription name
+     */
+    protected String _subscriptionName;
+
+    /**
+     * Indicates whether this consumer receives pre-acquired messages
+     */
+    private boolean _preAcquire = true;
+
+    /**
+     * A MessagePartListener set up for this consumer.
+     */
+    private MessageListener _messageListener;
+
+    /**
+     * A lcok on the syncrhonous message
+     */
+    private final Object _incomingMessageLock = new Object();
+
+
+    /**
+     * Number of mesages received asynchronously
+     * Nether exceed MAX_MESSAGE_TRANSFERRED
+     */
+    private int _messageAsyncrhonouslyReceived = 0;
+
+    private LinkedBlockingQueue<QpidMessage> _queue = new LinkedBlockingQueue<QpidMessage>();
+
+    //----- Constructors
+    /**
+     * Create a new MessageProducerImpl.
+     *
+     * @param session          The session from which the MessageProducerImpl is instantiated
+     * @param destination      The default destination for this MessageProducerImpl
+     * @param messageSelector  The message selector for this QueueReceiverImpl.
+     * @param noLocal          If true inhibits the delivery of messages published by its own connection.
+     * @param subscriptionName Name of the subscription if this is to be created as a durable subscriber.
+     *                         If this value is null, a non-durable subscription is created.
+     * @param consumerTag      Thi actor ID
+     * @throws Exception If the MessageProducerImpl cannot be created due to some internal error.
+     */
+    protected MessageConsumerImpl(SessionImpl session, DestinationImpl destination, String messageSelector,
+                                  boolean noLocal, String subscriptionName, String consumerTag) throws Exception
+    {
+        super(session, destination, consumerTag);
+        if (messageSelector != null)
+        {
+            _messageSelector = messageSelector;
+            _filter = new JMSSelectorFilter(messageSelector);
+        }
+        _noLocal = noLocal;
+        _subscriptionName = subscriptionName;
+        _isStopped = getSession().isStopped();
+        // let's create a message part assembler
+
+        MessagePartListener messageAssembler = new MessagePartListenerAdapter(this);
+
+        if (destination instanceof Queue)
+        {
+            // this is a queue we expect that this queue exists
+            getSession().getQpidSession()
+                    .messageSubscribe(destination.getQpidQueueName(), // queue
+                                      getMessageActorID(), // destination
+                                      org.apache.qpidity.nclient.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
+                                      // When the message selctor is set we do not acquire the messages
+                                      _messageSelector != null ? org.apache.qpidity.nclient.Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE : org.apache.qpidity.nclient.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
+                                      messageAssembler, null, _noLocal ? Option.NO_LOCAL : Option.NO_OPTION);
+            if (_messageSelector != null)
+            {
+                _preAcquire = false;
+            }
+        }
+        else
+        {
+            // this is a topic we need to create a temporary queue for this consumer
+            // unless this is a durable subscriber
+            String queueName;
+            if (subscriptionName != null)
+            {
+                // this ia a durable subscriber
+                // create a persistent queue for this subscriber
+                queueName = "topic-" + subscriptionName;
+                getSession().getQpidSession()
+                        .queueDeclare(queueName, null, null, Option.EXCLUSIVE, Option.DURABLE);
+            }
+            else
+            {
+                // this is a non durable subscriber
+                queueName = destination.getQpidQueueName();
+                getSession().getQpidSession()
+                        .queueDeclare(queueName, null, null, Option.AUTO_DELETE, Option.EXCLUSIVE);
+            }
+            // bind this queue with the topic exchange
+            getSession().getQpidSession()
+                    .queueBind(queueName, ExchangeDefaults.TOPIC_EXCHANGE_NAME, destination.getRoutingKey(), null);
+            // subscribe to this topic 
+            getSession().getQpidSession()
+                    .messageSubscribe(queueName, getMessageActorID(),
+                                      org.apache.qpidity.nclient.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
+                                      // We always acquire the messages
+                                      org.apache.qpidity.nclient.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
+                                      messageAssembler, null, _noLocal ? Option.NO_LOCAL : Option.NO_OPTION,
+                                      // Request exclusive subscription access, meaning only this subscription
+                                      // can access the queue.
+                                      Option.EXCLUSIVE);
+
+        }
+        // set the flow mode
+        getSession().getQpidSession()
+                .messageFlowMode(getMessageActorID(), org.apache.qpidity.nclient.Session.MESSAGE_FLOW_MODE_CREDIT);
+
+        // this will prevent the broker from sending more than one message
+        // When a messageListener is set the flow will be adjusted.
+        // until then we assume it's for synchronous message consumption
+        requestCredit(1);
+        requestSync();
+        // check for an exception
+        if (getSession().getCurrentException() != null)
+        {
+            throw getSession().getCurrentException();
+        }
+    }
+
+    //----- Message consumer API
+    /**
+     * Gets this  MessageConsumer's message selector.
+     *
+     * @return This MessageConsumer's message selector, or null if no
+     *         message selector exists for the message consumer (that is, if
+     *         the message selector was not set or was set to null or the
+     *         empty string)
+     * @throws JMSException if getting the message selector fails due to some internal error.
+     */
+    public String getMessageSelector() throws JMSException
+    {
+        checkNotClosed();
+        return _messageSelector;
+    }
+
+    /**
+     * Gets this MessageConsumer's <CODE>MessagePartListener</CODE>.
+     *
+     * @return The listener for the MessageConsumer, or null if no listener is set
+     * @throws JMSException if getting the message listener fails due to some internal error.
+     */
+    public MessageListener getMessageListener() throws JMSException
+    {
+        checkNotClosed();
+        return _messageListener;
+    }
+
+    /**
+     * Sets the MessageConsumer's <CODE>MessagePartListener</CODE>.
+     * <p> The JMS specification says:
+     * <P>Setting the message listener to null is the equivalent of
+     * unsetting the message listener for the message consumer.
+     * <P>The effect of calling <CODE>MessageConsumer.setMessageListener</CODE>
+     * while messages are being consumed by an existing listener
+     * or the consumer is being used to consume messages synchronously
+     * is undefined.
+     *
+     * @param messageListener The listener to which the messages are to be delivered
+     * @throws JMSException If setting the message listener fails due to some internal error.
+     */
+    public synchronized void setMessageListener(MessageListener messageListener) throws JMSException
+    {
+        // this method is synchronized as onMessage also access _messagelistener
+        // onMessage, getMessageListener and this method are the only synchronized methods
+        checkNotClosed();
+        try
+        {
+            _messageListener = messageListener;
+            if (messageListener != null)
+            {
+                resetAsynchMessageReceived();
+            }
+        }
+        catch (Exception e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+    }
+
+    /**
+     * Contact the broker and ask for the delivery of MAX_MESSAGE_TRANSFERRED messages
+     *
+     * @throws QpidException If there is a communication error
+     */
+    private void resetAsynchMessageReceived() throws QpidException
+    {
+        if (!_isStopped && _messageAsyncrhonouslyReceived >= MAX_MESSAGE_TRANSFERRED)
+        {
+            getSession().getQpidSession().messageStop(getMessageActorID());
+        }
+        _messageAsyncrhonouslyReceived = 0;
+        requestCredit(MAX_MESSAGE_TRANSFERRED);
+    }
+
+    /**
+     * Receive the next message produced for this message consumer.
+     * <P>This call blocks indefinitely until a message is produced or until this message consumer is closed.
+     *
+     * @return The next message produced for this message consumer, or
+     *         null if this message consumer is concurrently closed
+     * @throws JMSException If receiving the next message fails due to some internal error.
+     */
+    public Message receive() throws JMSException
+    {
+        // Check if we can get a message immediately
+        Message result;
+        result = receiveNoWait();
+        if (result != null)
+        {
+            return result;
+        }
+        try
+        {
+            // Now issue a credit and wait for the broker to send a message
+            // IMO no point doing a credit() flush() and sync() in a loop.
+            // This will only overload the broker. After the initial try we can wait
+            // for the broker to send a message when it gets one
+            requestCredit(1);
+            return (Message) _queue.take();
+        }
+        catch (Exception e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+    }
+
+    /**
+     * Receive the next message that arrives within the specified timeout interval.
+     * <p> This call blocks until a message arrives, the timeout expires, or this message consumer
+     * is closed.
+     * <p> A timeout of zero never expires, and the call blocks indefinitely.
+     * <p> A timeout less than 0 throws a JMSException.
+     *
+     * @param timeout The timeout value (in milliseconds)
+     * @return The next message that arrives within the specified timeout interval.
+     * @throws JMSException If receiving the next message fails due to some internal error.
+     */
+    public Message receive(long timeout) throws JMSException
+    {
+        checkClosed();
+        checkIfListenerSet();
+        if (timeout < 0)
+        {
+            throw new JMSException("Invalid timeout value: " + timeout);
+        }
+
+        Message result;
+        try
+        {
+            // first check if we have any in the queue already
+            result = (Message) _queue.poll();
+            if (result == null)
+            {
+                requestCredit(1);
+                requestFlush();
+                // We shouldn't do a sync(). Bcos the timeout can happen
+                // before the sync() returns
+                return (Message) _queue.poll(timeout, TimeUnit.MILLISECONDS);
+            }
+            else
+            {
+                return result;
+            }
+        }
+        catch (Exception e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+    }
+
+    /**
+     * Receive the next message if one is immediately available.
+     *
+     * @return the next message or null if one is not available.
+     * @throws JMSException If receiving the next message fails due to some internal error.
+     */
+    public Message receiveNoWait() throws JMSException
+    {
+        checkClosed();
+        checkIfListenerSet();
+        Message result;
+        try
+        {
+            // first check if we have any in the queue already
+            result = (Message) _queue.poll();
+            if (result == null)
+            {
+                requestCredit(1);
+                requestFlush();
+                requestSync();
+                return (Message) _queue.poll();
+            }
+            else
+            {
+                return result;
+            }
+        }
+        catch (Exception e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+    }
+
+    // not public methods
+    /**
+     * Upon receipt of this method, the broker adds "value"
+     * number of messages to the available credit balance for this consumer.
+     *
+     * @param value Number of credits, a value of 0 indicates an infinite amount of credit.
+     */
+    private void requestCredit(int value)
+    {
+        getSession().getQpidSession()
+                .messageFlow(getMessageActorID(), org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, value);
+    }
+
+    /**
+     * Forces the broker to exhaust its credit supply.
+     * <p> The broker's credit will always be zero when
+     * this method completes.
+     */
+    private void requestFlush()
+    {
+        getSession().getQpidSession().messageFlush(getMessageActorID());
+    }
+
+    /**
+     * Sync method will block until all outstanding broker
+     * commands
+     * are executed.
+     */
+    private void requestSync()
+    {
+        getSession().getQpidSession().sync();
+    }
+
+    /**
+     * Check whether this consumer is closed.
+     *
+     * @throws JMSException If this consumer is closed.
+     */
+    private void checkClosed() throws JMSException
+    {
+        if (_isStopped)
+        {
+            throw new JMSException("Session is closed");
+        }
+    }
+
+    /**
+     * Stop the delivery of messages to this consumer.
+     * <p>For asynchronous receiver, this operation blocks until the message listener
+     * finishes processing the current message,
+     *
+     * @throws Exception If the consumer cannot be stopped due to some internal error.
+     */
+    protected void stop() throws Exception
+    {
+        getSession().getQpidSession().messageStop(getMessageActorID());
+        _isStopped = true;
+    }
+
+    /**
+     * Start the delivery of messages to this consumer.
+     *
+     * @throws Exception If the consumer cannot be started due to some internal error.
+     */
+    protected void start() throws Exception
+    {
+        synchronized (_incomingMessageLock)
+        {
+            _isStopped = false;
+        }
+    }
+
+    /**
+     * This method notifies this consumer that a message has been delivered
+     * @param message The received message.
+     */
+    public void onMessage(org.apache.qpidity.api.Message message)
+    {
+        try
+        {
+            QpidMessage jmsMessage = MessageFactory.getQpidMessage(message);
+            if (checkPreConditions(jmsMessage))
+            {
+                preApplicationProcessing(jmsMessage);
+
+                if (_messageListener == null)
+                {
+                    _queue.offer(jmsMessage);
+                }
+                else
+                {
+                    // I still think we don't need that additional thread in SessionImpl
+                    // if the Application blocks on a message thats fine
+                    // getSession().dispatchMessage(getMessageActorID(), jmsMessage);
+                    notifyMessageListener(jmsMessage);
+                }
+            }
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e.getMessage());
+        }
+    }
+
+
+    public void notifyMessageListener(QpidMessage message) throws RuntimeException
+    {
+        try
+        {
+            _messageAsyncrhonouslyReceived++;
+            if (_messageAsyncrhonouslyReceived >= MAX_MESSAGE_TRANSFERRED)
+            {
+                // ask the server for the delivery of MAX_MESSAGE_TRANSFERRED more messages
+                resetAsynchMessageReceived();
+            }
+
+            // The JMS specs says:
+            /* The result of a listener throwing a RuntimeException depends on the session?s
+            * acknowledgment mode.
+            ? --- AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE - the message
+            * will be immediately redelivered. The number of times a JMS provider will
+            * redeliver the same message before giving up is provider-dependent.
+            ? --- CLIENT_ACKNOWLEDGE - the next message for the listener is delivered.
+            * --- Transacted Session - the next message for the listener is delivered.
+            *
+            * The number of time we try redelivering the message is 0
+            **/
+            try
+            {
+
+                _messageListener.onMessage((Message) message);
+            }
+            catch (RuntimeException re)
+            {
+                // do nothing as this message will not be redelivered
+            }
+
+
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e.getMessage());
+        }
+    }
+
+    /**
+     * Check whether this consumer is asynchronous
+     *
+     * @throws javax.jms.IllegalStateException If this consumer is asynchronous.
+     */
+    private void checkIfListenerSet() throws javax.jms.IllegalStateException
+    {
+
+        if (_messageListener != null)
+        {
+            throw new javax.jms.IllegalStateException("A listener has already been set.");
+        }
+    }
+
+    /**
+     * pre process a received message.
+     *
+     * @param message The message to pre-process.
+     * @throws Exception If the message  cannot be pre-processed due to some internal error.
+     */
+    private void preApplicationProcessing(QpidMessage message) throws Exception
+    {
+        getSession().preProcessMessage(message);
+        // If the session is transacted we need to ack the message first
+        // This is because a message is associated with its tx only when acked
+        if (getSession().getTransacted())
+        {
+            getSession().acknowledgeMessage(message);
+        }
+        message.afterMessageReceive();
+    }
+
+    /**
+     * Check whether a message can be delivered to this consumer.
+     *
+     * @param message The message to be checked.
+     * @return true if the message matches the selector and can be acquired, false otherwise.
+     * @throws QpidException If the message preConditions cannot be checked due to some internal error.
+     */
+    private boolean checkPreConditions(QpidMessage message) throws QpidException
+    {
+        boolean messageOk = true;
+        if (_messageSelector != null)
+        {
+            messageOk = _filter.matches((Message) message);
+        }
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("messageOk " + messageOk);
+            _logger.debug("_preAcquire " + _preAcquire);
+        }
+        if (!messageOk && _preAcquire)
+        {
+            // this is the case for topics
+            // We need to ack this message
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("filterMessage - trying to ack message");
+            }
+            acknowledgeMessage(message);
+        }
+        else if (!messageOk)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Message not OK, releasing");
+            }
+            releaseMessage(message);
+        }
+        // now we need to acquire this message if needed
+        // this is the case of queue with a message selector set
+        if (!_preAcquire && messageOk)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("filterMessage - trying to acquire message");
+            }
+            messageOk = acquireMessage(message);
+        }
+        return messageOk;
+    }
+
+    /**
+     * Release a message
+     *
+     * @param message The message to be released
+     * @throws QpidException If the message cannot be released due to some internal error.
+     */
+    private void releaseMessage(QpidMessage message) throws QpidException
+    {
+        if (_preAcquire)
+        {
+            RangeSet ranges = new RangeSet();
+            ranges.add(message.getMessageTransferId());
+            getSession().getQpidSession().messageRelease(ranges);
+            getSession().testQpidException();
+        }
+    }
+
+    /**
+     * Acquire a message
+     *
+     * @param message The message to be acquired
+     * @return true if the message has been acquired, false otherwise.
+     * @throws QpidException If the message cannot be acquired due to some internal error.
+     */
+    private boolean acquireMessage(QpidMessage message) throws QpidException
+    {
+        boolean result = false;
+        if (!_preAcquire)
+        {
+            RangeSet ranges = new RangeSet();
+            ranges.add(message.getMessageTransferId());
+
+            getSession().getQpidSession()
+                    .messageAcquire(ranges, org.apache.qpidity.nclient.Session.MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE);
+            getSession().getQpidSession().sync();
+            RangeSet acquired = getSession().getQpidSession().getAccquiredMessages();
+            if (acquired.size() > 0)
+            {
+                result = true;
+            }
+            getSession().testQpidException();
+        }
+        return result;
+    }
+
+    /**
+     * Acknowledge a message
+     *
+     * @param message The message to be acknowledged
+     * @throws QpidException If the message cannot be acquired due to some internal error.
+     */
+    private void acknowledgeMessage(QpidMessage message) throws QpidException
+    {
+        if (!_preAcquire)
+        {
+            RangeSet ranges = new RangeSet();
+            ranges.add(message.getMessageTransferId());
+            getSession().getQpidSession().messageAcknowledge(ranges);
+            getSession().testQpidException();
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageProducerImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageProducerImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageProducerImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageProducerImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,384 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import org.apache.qpidity.njms.message.MessageHelper;
+import org.apache.qpidity.njms.message.MessageImpl;
+import org.apache.qpidity.QpidException;
+
+import javax.jms.*;
+import java.util.UUID;
+import java.io.IOException;
+
+/**
+ * Implements  MessageProducer
+ */
+public class MessageProducerImpl extends MessageActor implements MessageProducer
+{
+    /**
+     * If true, messages will not get a timestamp.
+     */
+    private boolean _disableTimestamps = false;
+
+    /**
+     * Priority of messages created by this producer.
+     */
+    private int _messagePriority = Message.DEFAULT_PRIORITY;
+
+    /**
+     * Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
+     */
+    private long _timeToLive;
+
+    /**
+     * Delivery mode used for this producer.
+     */
+    private int _deliveryMode = DeliveryMode.PERSISTENT;
+
+    /**
+     * Speicify whether the messageID is disable
+     */
+    private boolean _disableMessageId = false;
+
+    //-- constructors
+    public MessageProducerImpl(SessionImpl session, DestinationImpl destination)
+    {
+        super(session, destination,"");
+    }
+
+    //--- Interface javax.njms.MessageProducer
+    /**
+     * Sets whether message IDs are disabled.
+     *
+     * @param value Specify whether the MessageID must be disabled
+     * @throws JMSException If disabling messageID fails due to some internal error.
+     */
+    public void setDisableMessageID(boolean value) throws JMSException
+    {
+        checkNotClosed();
+        _disableMessageId = value;
+    }
+
+    /**
+     * Gets an indication of whether message IDs are disabled.
+     *
+     * @return true is messageID is disabled, false otherwise
+     * @throws JMSException If getting whether messagID is disabled fails due to some internal error.
+     */
+    public boolean getDisableMessageID() throws JMSException
+    {
+        checkNotClosed();
+        return _disableMessageId;
+    }
+
+    /**
+     * Sets whether message timestamps are disabled.
+     * <P> JMS spec says:
+     * <p> Since timestamps take some effort to create and increase a
+     * message's size, some JMS providers may be able to optimize message
+     * overhead if they are given a hint that the timestamp is not used by an
+     * application....
+     * these messages must have the timestamp set to zero; if the provider
+     * ignores the hint, the timestamp must be set to its normal value.
+     * <p>Message timestamps are enabled by default.
+     *
+     * @param value Indicates if message timestamps are disabled
+     * @throws JMSException if disabling the timestamps fails due to some internal error.
+     */
+    public void setDisableMessageTimestamp(boolean value) throws JMSException
+    {
+        checkNotClosed();
+        _disableTimestamps = value;
+    }
+
+    /**
+     * Gets an indication of whether message timestamps are disabled.
+     *
+     * @return an indication of whether message timestamps are disabled
+     * @throws JMSException if getting whether timestamps are disabled fails due to some internal error.
+     */
+    public boolean getDisableMessageTimestamp() throws JMSException
+    {
+        checkNotClosed();
+        return _disableTimestamps;
+    }
+
+    /**
+     * Sets the producer's default delivery mode.
+     * <p> JMS specification says:
+     * <p>Delivery mode is set to {@link DeliveryMode#PERSISTENT} by default.
+     *
+     * @param deliveryMode The message delivery mode for this message producer; legal
+     *                     values are {@link DeliveryMode#NON_PERSISTENT}
+     *                     and {@link DeliveryMode#PERSISTENT}.
+     * @throws JMSException if setting the delivery mode fails due to some internal error.
+     */
+    public void setDeliveryMode(int deliveryMode) throws JMSException
+    {
+        checkNotClosed();
+        if ((deliveryMode != DeliveryMode.NON_PERSISTENT) && (deliveryMode != DeliveryMode.PERSISTENT))
+        {
+            throw new JMSException(
+                    "DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + deliveryMode + " is illegal");
+        }
+        _deliveryMode = deliveryMode;
+    }
+
+    /**
+     * Gets the producer's delivery mode.
+     *
+     * @return The message delivery mode for this message producer
+     * @throws JMSException If getting the delivery mode fails due to some internal error.
+     */
+    public int getDeliveryMode() throws JMSException
+    {
+        checkNotClosed();
+        return _deliveryMode;
+    }
+
+    /**
+     * Sets the producer's message priority.
+     * <p> The njms spec says:
+     * <p> The JMS API defines ten levels of priority value, with 0 as the
+     * lowest priority and 9 as the highest. Clients should consider priorities
+     * 0-4 as gradations of normal priority and priorities 5-9 as gradations
+     * of expedited priority.
+     * <p> Priority is set to 4 by default.
+     *
+     * @param priority The message priority for this message producer; must be a value between 0 and 9
+     * @throws JMSException if setting this producer priority fails due to some internal error.
+     */
+    public void setPriority(int priority) throws JMSException
+    {
+        checkNotClosed();
+        if ((priority < 0) || (priority > 9))
+        {
+            throw new IllegalArgumentException(
+                    "Priority of " + priority + " is illegal. Value must be in range 0 to 9");
+        }
+        _messagePriority = priority;
+    }
+
+    /**
+     * Gets the producer's message priority.
+     *
+     * @return The message priority for this message producer.
+     * @throws JMSException If getting this producer message priority fails due to some internal error.
+     */
+    public int getPriority() throws JMSException
+    {
+        checkNotClosed();
+        return _messagePriority;
+    }
+
+    /**
+     * Sets the default length of time in milliseconds from its dispatch time
+     * that a produced message should be retained by the message system.
+     * <p> The JMS spec says that time to live must be set to zero by default.
+     *
+     * @param timeToLive The message time to live in milliseconds; zero is unlimited
+     * @throws JMSException If setting the default time to live fails due to some internal error.
+     */
+    public void setTimeToLive(long timeToLive) throws JMSException
+    {
+        checkNotClosed();
+        if (timeToLive < 0)
+        {
+            throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + timeToLive);
+        }
+        _timeToLive = timeToLive;
+    }
+
+    /**
+     * Gets the default length of time in milliseconds from its dispatch time
+     * that a produced message should be retained by the message system.
+     *
+     * @return The default message time to live in milliseconds; zero is unlimited
+     * @throws JMSException if getting the default time to live fails due to some internal error.
+     * @see javax.jms.MessageProducer#setTimeToLive
+     */
+    public long getTimeToLive() throws JMSException
+    {
+        checkNotClosed();
+        return _timeToLive;
+    }
+
+    /**
+     * Gets the destination associated with this producer.
+     *
+     * @return This producer's destination.
+     * @throws JMSException If getting the destination for this producer fails
+     *                      due to some internal error.
+     */
+    public Destination getDestination() throws JMSException
+    {
+        checkNotClosed();
+        return _destination;
+    }
+
+    /**
+     * Sends a message using the producer's default delivery mode, priority, destination
+     * and time to live.
+     *
+     * @param message the message to be sent
+     * @throws JMSException                If sending the message fails due to some internal error.
+     * @throws MessageFormatException      If an invalid message is specified.
+     * @throws InvalidDestinationException If this producer destination is invalid.
+     * @throws java.lang.UnsupportedOperationException
+     *                                     If a client uses this method with a producer that did
+     *                                     not specify a destination at creation time.
+     */
+    public void send(Message message) throws JMSException
+    {
+        send(message, _deliveryMode, _messagePriority, _timeToLive);
+    }
+
+    /**
+     * Sends a message to this producer default destination, specifying delivery mode,
+     * priority, and time to live.
+     *
+     * @param message      The message to send
+     * @param deliveryMode The delivery mode to use
+     * @param priority     The priority for this message
+     * @param timeToLive   The message's lifetime (in milliseconds)
+     * @throws JMSException                If sending the message fails due to some internal error.
+     * @throws MessageFormatException      If an invalid message is specified.
+     * @throws InvalidDestinationException If this producer's destination is invalid.
+     * @throws java.lang.UnsupportedOperationException
+     *                                     If a client uses this method with a producer that did
+     *                                     not specify a destination at creation time.
+     */
+    public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
+    {
+        send(_destination, message, deliveryMode, priority, timeToLive);
+    }
+
+    /**
+     * Sends a message to a specified destination using this producer's default
+     * delivery mode, priority and time to live.
+     * <p/>
+     * <P>Typically, a message producer is assigned a destination at creation
+     * time; however, the JMS API also supports unidentified message producers,
+     * which require that the destination be supplied every time a message is
+     * sent.
+     *
+     * @param destination The destination to send this message to
+     * @param message     The message to send
+     * @throws JMSException                If sending the message fails due to some internal error.
+     * @throws MessageFormatException      If an invalid message is specified.
+     * @throws InvalidDestinationException If an invalid destination is specified.
+     */
+    public void send(Destination destination, Message message) throws JMSException
+    {
+        send(destination, message, _deliveryMode, _messagePriority, _timeToLive);
+    }
+
+    /**
+     * Sends a message to a destination specifying delivery mode, priority and time to live.
+     *
+     * @param destination  The destination to send this message to.
+     * @param message      The message to be sent.
+     * @param deliveryMode The delivery mode to use.
+     * @param priority     The priority for this message.
+     * @param timeToLive   The message's lifetime (in milliseconds)
+     * @throws JMSException                If sending the message fails due to some internal error.
+     * @throws MessageFormatException      If an invalid message is specified.
+     * @throws InvalidDestinationException If an invalid destination is specified.
+     */
+    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
+            throws JMSException
+    {
+        checkNotClosed();
+        getSession().checkDestination(destination);
+        // Do not allow negative timeToLive values
+        if (timeToLive < 0)
+        {
+            throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + timeToLive);
+        }
+        // Only get current time if required
+        long currentTime = Long.MIN_VALUE;
+        if (!((timeToLive == 0) && _disableTimestamps))
+        {
+            currentTime = System.currentTimeMillis();
+        }
+        // the messae UID
+        String uid = (_disableMessageId) ? "MSG_ID_DISABLED" : UUID.randomUUID().toString();
+        MessageImpl qpidMessage;
+        // check that the message is not a foreign one
+        try
+        {
+            qpidMessage = (MessageImpl) message;
+        }
+        catch (ClassCastException cce)
+        {
+            // this is a foreign message
+            qpidMessage = MessageHelper.transformMessage(message);
+            // set message's properties in case they are queried after send.
+            message.setJMSDestination(destination);
+            message.setJMSDeliveryMode(deliveryMode);
+            message.setJMSPriority(priority);
+            message.setJMSMessageID(uid);
+            if (timeToLive != 0)
+            {
+                message.setJMSExpiration(timeToLive + currentTime);
+                _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration());
+            }
+            else
+            {
+                message.setJMSExpiration(timeToLive);
+            }
+            message.setJMSTimestamp(currentTime);
+        }
+        // set the message properties
+        qpidMessage.setJMSDestination(destination);
+        qpidMessage.setJMSMessageID(uid);
+        qpidMessage.setJMSDeliveryMode(deliveryMode);
+        qpidMessage.setJMSPriority(priority);
+        if (timeToLive != 0)
+        {
+            qpidMessage.setJMSExpiration(timeToLive + currentTime);
+        }
+        else
+        {
+            qpidMessage.setJMSExpiration(timeToLive);
+        }
+        qpidMessage.setJMSTimestamp(currentTime);
+        qpidMessage.setRoutingKey(((DestinationImpl) destination).getDestinationName());
+        qpidMessage.setExchangeName(((DestinationImpl) destination).getExchangeName());
+        // call beforeMessageDispatch
+        try
+        {
+            qpidMessage.beforeMessageDispatch();
+        }
+        catch (QpidException e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+        try
+        {
+            getSession().getQpidSession().messageTransfer(qpidMessage.getExchangeName(),
+                                                          qpidMessage.getQpidityMessage(),
+                                                          org.apache.qpidity.nclient.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
+                                                          org.apache.qpidity.nclient.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE);
+        }
+        catch (IOException e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageProducerImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QpidBrowserListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QpidBrowserListener.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QpidBrowserListener.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QpidBrowserListener.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,70 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.qpidity.api.Message;
+import org.apache.qpidity.nclient.util.MessageListener;
+
+/**
+ * This listener idspatches messaes to its browser.
+ */
+public class QpidBrowserListener implements MessageListener
+{
+    /**
+     * Used for debugging.
+     */
+    private static final Logger _logger = LoggerFactory.getLogger(SessionImpl.class);
+
+    /**
+     * This message listener's browser.
+     */
+    QueueBrowserImpl _browser = null;
+
+      //---- constructor
+    /**
+     * Create a message listener wrapper for a given browser
+     *
+     * @param browser The browser of this listener
+     */
+    public QpidBrowserListener(QueueBrowserImpl browser)
+    {
+        _browser = browser;
+    }
+
+    //---- org.apache.qpidity.MessagePartListener API
+    /**
+     * Deliver a message to the listener.
+     *
+     * @param message The message delivered to the listner.
+     */
+    public void onMessage(Message message)
+    {
+        try
+        {
+            //convert this message into a JMS one
+            javax.jms.Message jmsMessage = null; // todo
+            _browser.receiveMessage(jmsMessage);
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e.getMessage());
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QpidBrowserListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QpidExceptionListenerImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QpidExceptionListenerImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QpidExceptionListenerImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QpidExceptionListenerImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,51 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import org.apache.qpidity.QpidException;
+
+import javax.jms.JMSException;
+
+/**
+ * An exception listner 
+ */
+public class QpidExceptionListenerImpl //implements ExceptionListener
+{
+    private javax.jms.ExceptionListener _jmsExceptionListener;
+
+    public QpidExceptionListenerImpl()
+    {
+    }
+
+    void setJMSExceptionListner(javax.jms.ExceptionListener jmsExceptionListener)
+    {
+        _jmsExceptionListener = jmsExceptionListener;
+    }
+    //----- ExceptionListener API
+
+    public void onException(QpidException exception)
+    {
+        // convert this exception in a JMS exception
+        JMSException jmsException = ExceptionHelper.convertQpidExceptionToJMSException(exception);
+        // propagate to the njms exception listener
+        if (_jmsExceptionListener != null)
+        {
+            _jmsExceptionListener.onException(jmsException);
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QpidExceptionListenerImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueBrowserImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueBrowserImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueBrowserImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueBrowserImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,256 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import java.util.Enumeration;
+import java.util.NoSuchElementException;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+
+import org.apache.qpidity.nclient.MessagePartListener;
+import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
+import org.apache.qpidity.filter.JMSSelectorFilter;
+import org.apache.qpidity.filter.MessageFilter;
+
+/**
+ * Implementation of the JMS QueueBrowser interface
+ */
+public class QueueBrowserImpl extends MessageActor implements QueueBrowser
+{
+    /**
+     * The browsers MessageSelector.
+     */
+    private String _messageSelector = null;
+
+    /**
+     * The message selector filter associated with this browser
+     */
+    private MessageFilter _filter = null;
+
+    /**
+     * The batch of messages to browse.
+     */
+    private Message[] _messages;
+
+    /**
+     * The number of messages read from current batch.
+     */
+    private int _browsed = 0;
+
+    /**
+     * The number of messages received from current batch.
+     */
+    private int _received = 0;
+
+    /**
+     * Indicates whether the last message has been received.
+     */
+    private int _batchLength;
+
+    /**
+     * The batch max size
+     */
+    private final int _maxbatchlength = 10;
+
+    //--- constructor
+
+    /**
+     * Create a QueueBrowser for a specific queue and a given message selector.
+     *
+     * @param session         The session of this browser.
+     * @param queue           The queue name for this browser
+     * @param messageSelector only messages with properties matching the message selector expression are delivered.
+     * @throws Exception In case of internal problem when creating this browser.
+     */
+    protected QueueBrowserImpl(SessionImpl session, Queue queue, String messageSelector,String consumerTag) throws Exception
+    {
+        super(session, (DestinationImpl) queue,consumerTag);
+        // this is an array representing a batch of messages for this browser.
+        _messages = new Message[_maxbatchlength];
+        if (messageSelector != null)
+        {
+            _messageSelector = messageSelector;
+            _filter = new JMSSelectorFilter(messageSelector);
+        }
+        MessagePartListener messageAssembler = new MessagePartListenerAdapter(new QpidBrowserListener(this));
+        // this is a queue we expect that this queue exists
+        getSession().getQpidSession()
+                .messageSubscribe(queue.getQueueName(), getMessageActorID(),
+                                  org.apache.qpidity.nclient.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
+                                  // We do not acquire those messages
+                                  org.apache.qpidity.nclient.Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE, messageAssembler, null);
+
+    }
+
+    //--- javax.njms.QueueBrowser API
+    /**
+     * Get an enumeration for browsing the current queue messages in the order they would be received.
+     *
+     * @return An enumeration for browsing the messages
+     * @throws JMSException If  getting the enumeration for this browser fails due to some internal error.
+     */
+    public Enumeration getEnumeration() throws JMSException
+    {
+        requestMessages();
+        return new MessageEnumeration();
+    }
+
+
+    /**
+     * Get the queue associated with this queue browser.
+     *
+     * @return The queue associated with this queue browser.
+     * @throws JMSException If getting the queue associated with this browser failts due to some internal error.
+     */
+    public Queue getQueue() throws JMSException
+    {
+        checkNotClosed();
+        return (Queue) _destination;
+    }
+
+    /**
+     * Get this queue browser's message selector expression.
+     *
+     * @return This queue browser's message selector, or null if no message selector exists.
+     * @throws JMSException if getting the message selector for this browser fails due to some internal error.
+     */
+    public String getMessageSelector() throws JMSException
+    {
+        checkNotClosed();
+        return _messageSelector;
+    }
+
+    //-- overwritten methods.  
+    /**
+     * Closes the browser and deregister it from its session.
+     *
+     * @throws JMSException if the MessaeActor cannot be closed due to some internal error.
+     */
+    public void close() throws JMSException
+    {
+        synchronized (_messages)
+        {
+            _received = 0;
+            _browsed = 0;
+            _batchLength = 0;
+            _messages.notify();
+        }
+        super.close();
+    }
+
+    //-- nonpublic methods
+    /**
+     * Request _maxbatchlength messages
+     *
+     * @throws JMSException If requesting more messages fails due to some internal error.
+     */
+    private void requestMessages() throws JMSException
+    {
+        _browsed = 0;
+        _received = 0;
+        // request messages
+        int received = 0;
+        getSession().getQpidSession()
+        .messageFlow(getMessageActorID(), org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
+                     _maxbatchlength);
+        _batchLength = 0; //getSession().getQpidSession().messageFlush(getMessageActorID());
+    }
+
+    /**
+     * This method is invoked by the listener when a message is dispatched to this browser.
+     *
+     * @param m A received message
+     */
+    protected void receiveMessage(Message m)
+    {
+        synchronized (_messages)
+        {
+            _messages[_received] = m;
+            _received++;
+            _messages.notify();
+        }
+    }
+
+    //-- inner class
+    /**
+     * This is an implementation of the Enumeration interface.
+     */
+    private class MessageEnumeration implements Enumeration
+    {
+        /*
+        * Whether this enumeration has any more elements.
+        *
+        * @return True if there any more elements.
+        */
+        public boolean hasMoreElements()
+        {
+            boolean result = false;
+            // Try to work out whether there are any more messages available.
+            try
+            {
+                if (_browsed >= _maxbatchlength)
+                {
+                    requestMessages();
+                }
+                synchronized (_messages)
+                {
+                    while (_received == _browsed && _batchLength > _browsed)
+                    {
+                        // we expect more messages
+                        _messages.wait();
+                    }
+                    if (_browsed < _received && _batchLength != _browsed)
+                    {
+                        result = true;
+                    }
+                }
+            }
+            catch (Exception e)
+            {
+                // If no batch could be returned, the result should be false, therefore do nothing
+            }
+            return result;
+        }
+
+        /**
+         * Get the next message element
+         *
+         * @return The next element.
+         */
+        public Object nextElement()
+        {
+            if (hasMoreElements())
+            {
+                synchronized (_messages)
+                {
+                    Message message = _messages[_browsed];
+                    _browsed = _browsed + 1;
+                    return message;
+                }
+            }
+            else
+            {
+                throw new NoSuchElementException();
+            }
+        }
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueBrowserImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueConnectionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueConnectionImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueConnectionImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueConnectionImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,36 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import org.apache.qpidity.njms.ConnectionImpl;
+import org.apache.qpidity.QpidException;
+
+import javax.jms.QueueConnection;
+
+/**
+ *
+ *  Implements javax.njms.QueueConnection
+ */
+public class QueueConnectionImpl extends ConnectionImpl implements QueueConnection
+{
+    //-- constructor
+      public QueueConnectionImpl(String host,int port,String virtualHost,String username,String password) throws                                                                                                     QpidException
+    {
+        super(host, port, virtualHost, username, password);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueConnectionImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,136 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.transport.Option;
+import org.apache.qpidity.url.BindingURL;
+import org.apache.qpidity.exchange.ExchangeDefaults;
+
+import javax.jms.Queue;
+import javax.jms.JMSException;
+
+/**
+ * Implementation of the JMS Queue interface
+ */
+public class QueueImpl extends DestinationImpl implements Queue
+{
+    //--- Constructor    
+    /**
+     * Create a new QueueImpl with a given name.
+     *
+     * @param name    The name of this queue.
+     * @param session The session used to create this queue.
+     * @throws QpidException If the queue name is not valid
+     */
+    protected QueueImpl(SessionImpl session, String name) throws QpidException
+    {
+        super(name);
+        _queueName = name;
+        _destinationName = name;
+        _exchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+        _exchangeType = ExchangeDefaults.DIRECT_EXCHANGE_CLASS;
+        _isAutoDelete = false;
+        _isDurable = true;
+        _isExclusive = false;
+        registerQueue(session, false);
+    }
+
+    /**
+     * Create a destiantion from a binding URL
+     *
+     * @param session The session used to create this queue.
+     * @param binding The URL
+     * @throws QpidException If the URL is not valid
+     */
+    protected QueueImpl(SessionImpl session, BindingURL binding) throws QpidException
+    {
+        super(binding);
+        registerQueue(session, false);
+    }
+
+    /**
+     * Create a destiantion from a binding URL
+     *
+     * @param binding The URL
+     * @throws QpidException If the URL is not valid
+     */
+    public QueueImpl(BindingURL binding) throws QpidException
+    {
+        super(binding);
+    }
+
+    /**
+     * Create a new QueueImpl with a given name.
+     *
+     * @param name The name of this queue.
+     * @throws QpidException If the queue name is not valid
+     */
+    public QueueImpl(String name) throws QpidException
+    {
+        super(name);
+        _queueName = name;
+        _destinationName = name;
+        _exchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+        _exchangeType = ExchangeDefaults.DIRECT_EXCHANGE_CLASS;
+        _isAutoDelete = false;
+        _isDurable = true;
+        _isExclusive = false;
+    }
+
+    //---- Interface javax.njms.Queue
+    /**
+     * Gets the name of this queue.
+     *
+     * @return This queue's name.
+     */
+    public String getQueueName() throws JMSException
+    {
+        return _queueName;
+    }
+
+    //---Private methods
+    /**
+     * Check that this queue exists and declare it if required.
+     *
+     * @param session The session used to create this destination
+     * @param declare Specify whether the queue should be declared
+     * @throws QpidException If this queue does not exists on the broker.
+     */
+    protected void registerQueue(SessionImpl session, boolean declare) throws QpidException
+    {
+        // test if this exchange exist on the broker
+        //todo we can also specify if the excahnge is autodlete and durable
+        session.getQpidSession().exchangeDeclare(_exchangeName, _exchangeType, null, null, Option.PASSIVE);
+        // wait for the broker response
+        session.getQpidSession().sync();
+        // If this exchange does not exist then we will get an Expection 404 does notexist
+        //todo check for an execption
+        // now check if the queue exists
+        session.getQpidSession().queueDeclare(_queueName, null, null, _isDurable ? Option.DURABLE : Option.NO_OPTION,
+                                              _isAutoDelete ? Option.AUTO_DELETE : Option.NO_OPTION,
+                                              _isExclusive ? Option.EXCLUSIVE : Option.NO_OPTION,
+                                              declare ? Option.PASSIVE : Option.NO_OPTION);
+        // wait for the broker response
+        session.getQpidSession().sync();
+        // If this queue does not exist then we will get an Expection 404 does notexist
+        session.getQpidSession().queueBind(_queueName, _exchangeName, _destinationName, null);
+        // we don't have to sync as we don't expect an error
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueReceiverImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueReceiverImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueReceiverImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueReceiverImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,55 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import javax.jms.QueueReceiver;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+
+/**
+ * Implements javax.njms.QueueReceiver
+ */
+public class QueueReceiverImpl extends MessageConsumerImpl implements QueueReceiver
+{
+    //--- Constructor
+    /**
+     * create a new QueueReceiverImpl.
+     *
+     * @param session         The session from which the QueueReceiverImpl is instantiated.
+     * @param queue           The default queue for this QueueReceiverImpl.
+     * @param messageSelector the message selector for this QueueReceiverImpl.  
+     * @throws Exception If the QueueReceiverImpl cannot be created due to some internal error.
+     */
+    protected QueueReceiverImpl(SessionImpl session, Queue queue, String messageSelector,String consumerTag) throws Exception
+    {
+        super(session, (DestinationImpl) queue, messageSelector, false, null,consumerTag);
+    }
+
+    //--- Interface  QueueReceiver
+    /**
+     * Get the Queue associated with this queue receiver.
+     *
+     * @return this receiver's Queue
+     * @throws JMSException If getting the queue for this queue receiver fails due to some internal error.
+     */
+    public Queue getQueue() throws JMSException
+    {
+        checkNotClosed();
+        return (QueueImpl) _destination;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueReceiverImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueSenderImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueSenderImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueSenderImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueSenderImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,131 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import javax.jms.QueueSender;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.Message;
+
+/**
+ * Implements javax.njms.QueueSender
+ */
+public class QueueSenderImpl extends MessageProducerImpl implements QueueSender
+{
+    //--- Constructor
+    /**
+     * Create a new QueueSenderImpl.
+     *
+     * @param session the session from which the QueueSenderImpl is instantiated
+     * @param queue   the default queue for this QueueSenderImpl
+     * @throws JMSException If the QueueSenderImpl cannot be created due to some internal error.
+     */
+    protected QueueSenderImpl(SessionImpl session, QueueImpl queue) throws JMSException
+    {
+        super(session, queue);
+    }
+
+    //--- Interface javax.njms.QueueSender
+    /**
+     * Get the queue associated with this QueueSender.
+     *
+     * @return This QueueSender's queue
+     * @throws JMSException If getting the queue for this QueueSender fails due to some internal error.
+     */
+    public Queue getQueue() throws JMSException
+    {
+        return (Queue) getDestination();
+    }
+
+    /**
+     * Sends a message to the queue. Uses the <CODE>QueueSender</CODE>'s default delivery mode, priority,
+     * and time to live.
+     *
+     * @param message The message to send.
+     * @throws JMSException if sending the message fails due to some internal error.
+     * @throws javax.jms.MessageFormatException
+     *                      If an invalid message is specified.
+     * @throws javax.jms.InvalidDestinationException
+     *                      If the queue is invalid.
+     * @throws java.lang.UnsupportedOperationException
+     *                      If invoked on QueueSender that did not specify a queue at creation time.
+     */
+    public void send(Message message) throws JMSException
+    {
+        super.send(message);
+    }
+
+    /**
+     * Send a message to the queue, specifying delivery mode, priority, and time to live.
+     *
+     * @param message      The message to send
+     * @param deliveryMode The delivery mode to use
+     * @param priority     The priority for this message
+     * @param timeToLive   The message's lifetime (in milliseconds)
+     *  
+     * @throws JMSException if sending the message fails due to some internal error.
+     * @throws javax.jms.MessageFormatException
+     *                      If an invalid message is specified.
+     * @throws javax.jms.InvalidDestinationException
+     *                      If the queue is invalid.
+     * @throws java.lang.UnsupportedOperationException
+     *                      If invoked on QueueSender that did not specify a queue at creation time.
+     */
+    public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
+    {
+        super.send(message, deliveryMode, priority, timeToLive);
+    }
+
+    /**
+     * Send a message to a queue for an unidentified message producer.
+     * Uses the <CODE>QueueSender</CODE>'s default delivery mode, priority,
+     * and time to live.
+     *
+     * @param queue   The queue to send this message to
+     * @param message The message to send
+     * @throws JMSException if sending the message fails due to some internal error.
+     * @throws javax.jms.MessageFormatException
+     *                      If an invalid message is specified.
+     * @throws javax.jms.InvalidDestinationException
+     *                      If the queue is invalid.
+     */
+    public void send(Queue queue, Message message) throws JMSException
+    {
+        super.send(queue, message);
+    }
+
+    /**
+     * Sends a message to a queue for an unidentified message producer,
+     * specifying delivery mode, priority and time to live.
+     *
+     * @param queue        The queue to send this message to
+     * @param message      The message to send
+     * @param deliveryMode The delivery mode to use
+     * @param priority     The priority for this message
+     * @param timeToLive   The message's lifetime (in milliseconds)
+     * @throws JMSException if sending the message fails due to some internal error.
+     * @throws javax.jms.MessageFormatException
+     *                      If an invalid message is specified.
+     * @throws javax.jms.InvalidDestinationException
+     *                      If the queue is invalid.
+     */
+    public void send(Queue queue, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
+    {
+        super.send(queue, message, deliveryMode, priority, timeToLive);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueSenderImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueSessionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueSessionImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueSessionImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueSessionImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,154 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+
+import org.apache.qpidity.QpidException;
+
+/**
+ * Implementation of javax.njms.QueueSession
+ */
+public class QueueSessionImpl extends SessionImpl implements QueueSession
+{
+    //--- constructor
+    /**
+     * Create a JMS Session
+     *
+     * @param connection      The ConnectionImpl object from which the Session is created.
+     * @param transacted      Indicates if the session transacted.
+     * @param acknowledgeMode The session's acknowledgement mode. This value is ignored and set to
+     *                        {@link javax.jms.Session#SESSION_TRANSACTED} if the <code>transacted</code>
+     *                        parameter is true.
+     * @throws javax.jms.JMSSecurityException If the user could not be authenticated.
+     * @throws javax.jms.JMSException         In case of internal error.
+     */
+    protected QueueSessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode) throws QpidException, JMSException
+    {
+        super(connection, transacted, acknowledgeMode,false);
+    }
+
+    //-- Overwritten methods
+    /**
+     * Creates a durable subscriber to the specified topic,
+     *
+     * @param topic The non-temporary <CODE>Topic</CODE> to subscribe to.
+     * @param name  The name used to identify this subscription.
+     * @return Always throws an exception
+     * @throws IllegalStateException Always
+     */
+    @Override
+    public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
+    {
+        throw new IllegalStateException("Cannot invoke createDurableSubscriber from QueueSession");
+    }
+
+    /**
+     * Create a TemporaryTopic.
+     *
+     * @return Always throws an exception
+     * @throws IllegalStateException Always
+     */
+    @Override
+    public TemporaryTopic createTemporaryTopic() throws JMSException
+    {
+        throw new IllegalStateException("Cannot invoke createTemporaryTopic from QueueSession");
+    }
+
+    /**
+     * Creates a topic identity given a Topicname.
+     *
+     * @param topicName The name of this <CODE>Topic</CODE>
+     * @return Always throws an exception
+     * @throws IllegalStateException Always
+     */
+    @Override
+    public Topic createTopic(String topicName) throws JMSException
+    {
+        throw new IllegalStateException("Cannot invoke createTopic from QueueSession");
+    }
+
+    /**
+     * Unsubscribes a durable subscription that has been created by a client.
+     *
+     * @param name the name used to identify this subscription
+     * @throws IllegalStateException Always
+     */
+    @Override
+    public void unsubscribe(String name) throws JMSException
+    {
+        throw new IllegalStateException("Cannot invoke unsubscribe from QueueSession");
+    }
+
+    //--- Interface javax.njms.QueueSession
+    /**
+     * Create a QueueReceiver to receive messages from the specified queue.
+     *
+     * @param queue the <CODE>Queue</CODE> to access
+     * @return A QueueReceiver
+     * @throws JMSException                If creating a receiver fails due to some internal error.
+     * @throws InvalidDestinationException If an invalid queue is specified.
+     */
+    public QueueReceiver createReceiver(Queue queue) throws JMSException
+    {
+        return createReceiver(queue, null);
+    }
+
+    /**
+     * Create a QueueReceiver to receive messages from the specified queue for a given message selector.
+     *
+     * @param queue           the Queue to access
+     * @param messageSelector A value of null or an empty string indicates that
+     *                        there is no message selector for the message consumer.
+     * @return A QueueReceiver
+     * @throws JMSException                If creating a receiver fails due to some internal error.
+     * @throws InvalidDestinationException If an invalid queue is specified.
+     * @throws InvalidSelectorException    If the message selector is invalid.
+     */
+    public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
+    {
+        checkNotClosed();
+        checkDestination(queue);
+        QueueReceiver receiver;
+        try
+        {
+            receiver =  new QueueReceiverImpl(this, queue, messageSelector,String.valueOf(_consumerTag.incrementAndGet()));
+        }
+        catch (Exception e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+        return receiver;
+    }
+
+    /**
+     * Create a QueueSender object to send messages to the specified queue.
+     *
+     * @param queue the Queue to access, or null if this is an unidentified producer
+     * @return A QueueSender
+     * @throws JMSException                If creating the sender fails due to some internal error.
+     * @throws InvalidDestinationException If an invalid queue is specified.
+     */
+    public QueueSender createSender(Queue queue) throws JMSException
+    {
+        checkNotClosed();
+        // we do not check the destination since unidentified producers are allowed (no default destination).
+        return new QueueSenderImpl(this, (QueueImpl) queue);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueSessionImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native