You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2013/09/25 16:33:07 UTC

svn commit: r1526190 [6/7] - in /qpid/trunk/qpid/java/amqp-1-0-client-jms: example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/ src/main/java/org/apache/qpid/amqp_1_0/jms/ src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ src/main/java/org/apache...

Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java?rev=1526190&r1=1526189&r2=1526190&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java Wed Sep 25 14:33:06 2013
@@ -1,994 +1,994 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.amqp_1_0.jms.impl;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Enumeration;
-import java.util.Iterator;
-import java.util.List;
-import java.util.UUID;
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.IllegalStateException;
-import javax.jms.InvalidDestinationException;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.MessageEOFException;
-import javax.jms.MessageListener;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.StreamMessage;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import org.apache.qpid.amqp_1_0.client.Connection;
-import org.apache.qpid.amqp_1_0.client.ConnectionClosedException;
-import org.apache.qpid.amqp_1_0.client.ConnectionErrorException;
-import org.apache.qpid.amqp_1_0.client.ConnectionException;
-import org.apache.qpid.amqp_1_0.client.Message;
-import org.apache.qpid.amqp_1_0.client.Receiver;
-import org.apache.qpid.amqp_1_0.client.Sender;
-import org.apache.qpid.amqp_1_0.client.Transaction;
-import org.apache.qpid.amqp_1_0.jms.QueueReceiver;
-import org.apache.qpid.amqp_1_0.jms.QueueSender;
-import org.apache.qpid.amqp_1_0.jms.QueueSession;
-import org.apache.qpid.amqp_1_0.jms.Session;
-import org.apache.qpid.amqp_1_0.jms.TemporaryDestination;
-import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
-import org.apache.qpid.amqp_1_0.jms.TopicSession;
-import org.apache.qpid.amqp_1_0.jms.TopicSubscriber;
-import org.apache.qpid.amqp_1_0.transport.SessionEventListener;
-import org.apache.qpid.amqp_1_0.type.messaging.Source;
-import org.apache.qpid.amqp_1_0.type.messaging.Target;
-import org.apache.qpid.amqp_1_0.type.transport.*;
-import org.apache.qpid.amqp_1_0.type.transport.Error;
-
-public class SessionImpl implements Session, QueueSession, TopicSession
-{
-    private ConnectionImpl _connection;
-    private AcknowledgeMode _acknowledgeMode;
-    private org.apache.qpid.amqp_1_0.client.Session _session;
-    private MessageFactory _messageFactory;
-    private List<MessageConsumerImpl> _consumers = new ArrayList<MessageConsumerImpl>();
-    private List<MessageProducerImpl> _producers = new ArrayList<MessageProducerImpl>();
-
-    private MessageListener _messageListener;
-    private Dispatcher _dispatcher = new Dispatcher();
-    private Thread _dispatcherThread;
-
-    private boolean _closed;
-
-    private boolean _isQueueSession;
-    private boolean _isTopicSession;
-    private Transaction _txn;
-
-    protected SessionImpl(final ConnectionImpl connection, final AcknowledgeMode acknowledgeMode) throws JMSException
-    {
-        _connection = connection;
-        _acknowledgeMode = acknowledgeMode;
-        Connection clientConn = _connection.getClientConnection();
-        try
-        {
-            _session = clientConn.createSession();
-        }
-        catch (ConnectionException e)
-        {
-            final JMSException jmsException = new JMSException(e.getMessage());
-            jmsException.setLinkedException(e);
-            throw jmsException;
-        }
-        _session.getEndpoint().setSessionEventListener(new SessionEventListener.DefaultSessionEventListener()
-        {
-            @Override
-            public void remoteEnd(End end)
-            {
-                if(!_closed)
-                {
-                    try
-                    {
-                        close();
-                    }
-                    catch (JMSException e)
-                    {
-                    }
-                    try
-                    {
-                        final Error error = end.getError();
-                        final ExceptionListener exceptionListener = _connection.getExceptionListener();
-                        if(exceptionListener != null)
-                        {
-                            if(error != null)
-                            {
-                                exceptionListener.onException(new JMSException(error.getDescription(),
-                                        error.getCondition().getValue().toString()));
-                            }
-                            else
-                            {
-                                exceptionListener.onException(new JMSException("Session remotely closed"));
-                            }
-                        }
-                    }
-                    catch (JMSException e)
-                    {
-
-                    }
-
-                }
-            }
-        });
-        if(_acknowledgeMode == AcknowledgeMode.SESSION_TRANSACTED)
-        {
-            _txn = _session.createSessionLocalTransaction();
-        }
-
-        _messageFactory = new MessageFactory(this);
-
-    }
-
-    public BytesMessageImpl createBytesMessage() throws IllegalStateException
-    {
-        checkClosed();
-        return new BytesMessageImpl(this);
-
-    }
-
-    public MapMessageImpl createMapMessage() throws JMSException
-    {
-        checkClosed();
-        return new MapMessageImpl(this);
-    }
-
-    public MessageImpl createMessage() throws IllegalStateException
-    {
-        return createAmqpMessage();
-    }
-
-    public ObjectMessageImpl createObjectMessage() throws JMSException
-    {
-        checkClosed();
-        return new ObjectMessageImpl(this);
-    }
-
-    public ObjectMessageImpl createObjectMessage(final Serializable serializable) throws JMSException
-    {
-        checkClosed();
-        ObjectMessageImpl msg = new ObjectMessageImpl(this);
-        msg.setObject(serializable);
-        return msg;
-    }
-
-    public StreamMessageImpl createStreamMessage() throws JMSException
-    {
-        checkClosed();
-        return new StreamMessageImpl(this);
-    }
-
-    public TextMessageImpl createTextMessage() throws JMSException
-    {
-        return createTextMessage("");
-    }
-
-    public TextMessageImpl createTextMessage(final String s) throws JMSException
-    {
-        checkClosed();
-        TextMessageImpl msg = new TextMessageImpl(this);
-        msg.setText(s);
-        return msg;
-    }
-
-    public AmqpMessageImpl createAmqpMessage() throws IllegalStateException
-    {
-        checkClosed();
-        return new AmqpMessageImpl(this);
-    }
-
-    public boolean getTransacted() throws JMSException
-    {
-        checkClosed();
-        return _acknowledgeMode == AcknowledgeMode.SESSION_TRANSACTED;
-    }
-
-    public int getAcknowledgeMode() throws IllegalStateException
-    {
-        checkClosed();
-        return _acknowledgeMode.ordinal();
-    }
-
-    AcknowledgeMode getAckModeEnum()
-    {
-        return _acknowledgeMode;
-    }
-
-    public void commit() throws JMSException
-    {
-        checkClosed();
-        checkTransactional();
-
-        _txn.commit();
-        for(MessageConsumerImpl consumer : _consumers)
-        {
-            consumer.postCommit();
-        }
-
-        _txn = _session.createSessionLocalTransaction();
-        //TODO
-    }
-
-    public void rollback() throws JMSException
-    {
-        checkClosed();
-        checkTransactional();
-
-        _txn.rollback();
-
-        for(MessageConsumerImpl consumer : _consumers)
-        {
-            consumer.postRollback();
-        }
-
-        _txn = _session.createSessionLocalTransaction();
-
-        //TODO
-    }
-
-    private void checkTransactional() throws JMSException
-    {
-        if(!getTransacted())
-        {
-            throw new IllegalStateException("Session must be transacted in order to perform this operation");
-        }
-    }
-
-    public void close() throws JMSException
-    {
-        if(!_closed)
-        {
-            _closed = true;
-            _dispatcher.close();
-            for(MessageConsumerImpl consumer : _consumers)
-            {
-                consumer.close();
-            }
-            for(MessageProducerImpl producer : _producers)
-            {
-                producer.close();
-            }
-            _session.close();
-            _connection.removeSession(this);
-        }
-    }
-
-    private void checkClosed() throws IllegalStateException
-    {
-        if(_closed)
-        {
-            throw new IllegalStateException("Closed");
-        }
-    }
-
-    public void recover() throws JMSException
-    {
-        checkClosed();
-        checkNotTransactional();
-
-        if(_acknowledgeMode == AcknowledgeMode.CLIENT_ACKNOWLEDGE)
-        {
-            synchronized(_session.getEndpoint().getLock())
-            {
-                for(MessageConsumerImpl consumer : _consumers)
-                {
-                    consumer.doRecover();
-                }
-            }
-        }
-        else
-        {
-            if(Thread.currentThread() == _dispatcherThread)
-            {
-                _dispatcher.doRecover();
-            }
-        }
-
-    }
-
-    private void checkNotTransactional() throws JMSException
-    {
-
-        if(getTransacted())
-        {
-            throw new IllegalStateException("This operation cannot be carried out on a transacted session");
-        }
-    }
-
-    public MessageListener getMessageListener() throws JMSException
-    {
-        return _messageListener;
-    }
-
-    public void setMessageListener(final MessageListener messageListener) throws JMSException
-    {
-        if(_messageListener != null)
-        {
-            // TODO
-        }
-        else
-        {
-            _messageListener = messageListener;
-        }
-    }
-
-    public void run()
-    {
-        //TODO
-    }
-
-    public MessageProducerImpl createProducer(final Destination destination) throws JMSException
-    {
-        checkClosed();
-
-        final MessageProducerImpl messageProducer = new MessageProducerImpl(destination, this);
-
-        _producers.add(messageProducer);
-
-        return messageProducer;
-    }
-
-    public MessageConsumerImpl createConsumer(final Destination destination) throws JMSException
-    {
-        checkClosed();
-        return createConsumer(destination, null, false);
-    }
-
-    public MessageConsumerImpl createConsumer(final Destination destination, final String selector) throws JMSException
-    {
-        checkClosed();
-        return createConsumer(destination, selector, false);
-    }
-
-    public MessageConsumerImpl createConsumer(final Destination destination, final String selector, final boolean noLocal)
-            throws JMSException
-    {
-        checkClosed();
-        checkValidDestination(destination);
-        if(destination instanceof TemporaryDestination)
-        {
-            TemporaryDestination temporaryDestination = (TemporaryDestination) destination;
-            if(temporaryDestination.getSession() != this)
-            {
-                throw new JMSException("Cannot consume from a temporary destination created on another session");
-            }
-            if(temporaryDestination.isDeleted())
-            {
-                throw new IllegalStateException("Destination is deleted");
-            }
-        }
-        final MessageConsumerImpl messageConsumer;
-        synchronized(_session.getEndpoint().getLock())
-        {
-            if(_dispatcherThread == null)
-            {
-                _dispatcherThread = new Thread(_dispatcher);
-                _dispatcherThread.start();
-            }
-
-            messageConsumer = new MessageConsumerImpl(destination, this, selector, noLocal);
-            addConsumer(messageConsumer);
-            if(_connection.isStarted())
-            {
-                messageConsumer.start();
-            }
-        }
-        return messageConsumer;
-    }
-
-    private void checkValidDestination(Destination destination) throws InvalidDestinationException
-    {
-        if (destination == null || !(destination instanceof DestinationImpl))
-        {
-            throw new InvalidDestinationException("Invalid Destination");
-        }
-    }
-
-
-    protected void addConsumer(final MessageConsumerImpl messageConsumer)
-    {
-        _consumers.add(messageConsumer);
-    }
-
-    public QueueImpl createQueue(final String s) throws JMSException
-    {
-        checkClosed();
-        checkNotTopicSession();
-        return new QueueImpl(s);
-    }
-
-    public QueueReceiver createReceiver(final Queue queue) throws JMSException
-    {
-        checkClosed();
-        checkNotTopicSession();
-        return createConsumer(queue);
-    }
-
-    public QueueReceiver createReceiver(final Queue queue, final String selector) throws JMSException
-    {
-        checkClosed();
-        checkNotTopicSession();
-        return createConsumer(queue, selector);
-    }
-
-    public QueueSender createSender(final Queue queue) throws JMSException
-    {
-        checkClosed();
-        checkNotTopicSession();
-        return createProducer(queue);
-    }
-
-    public TopicImpl createTopic(final String s) throws JMSException
-    {
-        checkClosed();
-        checkNotQueueSession();
-        return new TopicImpl(s);
-    }
-
-    public TopicSubscriber createSubscriber(final Topic topic) throws JMSException
-    {
-        checkClosed();
-        checkNotQueueSession();
-        return createConsumer(topic);
-    }
-
-    public TopicSubscriber createSubscriber(final Topic topic, final String selector, final boolean noLocal) throws JMSException
-    {
-        checkClosed();
-        checkNotQueueSession();
-        return createConsumer(topic, selector, noLocal);
-    }
-
-    public TopicSubscriberImpl createDurableSubscriber(final Topic topic, final String name) throws JMSException
-    {
-        checkClosed();
-        checkNotQueueSession();
-        return createDurableSubscriber(topic, name, null, false);
-    }
-
-    private void checkNotQueueSession() throws IllegalStateException
-    {
-        if(_isQueueSession)
-        {
-            throw new IllegalStateException("Cannot perform this operation on a QueueSession");
-        }
-    }
-
-
-    private void checkNotTopicSession() throws IllegalStateException
-    {
-        if(_isTopicSession)
-        {
-            throw new IllegalStateException("Cannot perform this operation on a TopicSession");
-        }
-    }
-
-    public TopicSubscriberImpl createDurableSubscriber(final Topic topic, final String name, final String selector, final boolean noLocal)
-            throws JMSException
-    {
-        checkClosed();
-        checkNotQueueSession();
-        if(!(topic instanceof TopicImpl))
-        {
-            throw new InvalidDestinationException("invalid destination " + topic);
-        }
-        final TopicSubscriberImpl messageConsumer;
-        synchronized(_session.getEndpoint().getLock())
-        {
-            messageConsumer = new TopicSubscriberImpl(name, true, (org.apache.qpid.amqp_1_0.jms.Topic) topic, this,
-                                                      selector,
-                                                      noLocal);
-            addConsumer(messageConsumer);
-            if(_connection.isStarted())
-            {
-                messageConsumer.start();
-            }
-        }
-        return messageConsumer;
-    }
-
-    public TopicPublisher createPublisher(final Topic topic) throws JMSException
-    {
-        checkClosed();
-        checkNotQueueSession();
-        return createProducer(topic);
-    }
-
-    public QueueBrowserImpl createBrowser(final Queue queue) throws JMSException
-    {
-        checkClosed();
-        checkNotTopicSession();
-        checkValidDestination(queue);
-        return createBrowser(queue, null);
-    }
-
-    public QueueBrowserImpl createBrowser(final Queue queue, final String selector) throws JMSException
-    {
-        checkClosed();
-        checkNotTopicSession();
-        checkValidDestination(queue);
-
-        return new QueueBrowserImpl((QueueImpl) queue, selector, this);
-
-    }
-
-    public TemporaryQueueImpl createTemporaryQueue() throws JMSException
-    {
-        checkClosed();
-        checkNotTopicSession();
-        try
-        {
-            Sender send = _session.createTemporaryQueueSender();
-
-            TemporaryQueueImpl tempQ = new TemporaryQueueImpl(((Target)send.getTarget()).getAddress(), send, this);
-            return tempQ;
-        }
-        catch (Sender.SenderCreationException e)
-        {
-            throw new JMSException("Unable to create temporary queue");
-        }
-        catch (ConnectionClosedException e)
-        {
-            throw new JMSException("Unable to create temporary queue");
-        }
-    }
-
-    public TemporaryTopicImpl createTemporaryTopic() throws JMSException
-    {
-        checkClosed();
-        checkNotQueueSession();
-        try
-        {
-            Sender send = _session.createTemporaryQueueSender();
-
-            TemporaryTopicImpl tempQ = new TemporaryTopicImpl(((Target)send.getTarget()).getAddress(), send, this);
-            return tempQ;
-        }
-        catch (Sender.SenderCreationException e)
-        {
-            throw new JMSException("Unable to create temporary queue");
-        }
-        catch (ConnectionClosedException e)
-        {
-            throw new JMSException("Unable to create temporary queue");
-        }
-    }
-
-    public void unsubscribe(final String s) throws JMSException
-    {
-        checkClosed();
-
-        checkNotQueueSession();
-
-        Target target = new Target();
-        target.setAddress(UUID.randomUUID().toString());
-
-        try
-        {
-            Receiver receiver = new Receiver(getClientSession(), s, target, null,
-                                             org.apache.qpid.amqp_1_0.client.AcknowledgeMode.ALO, false);
-
-            final org.apache.qpid.amqp_1_0.type.Source receiverSource = receiver.getSource();
-            if(receiverSource instanceof Source)
-            {
-                Source source = (Source) receiverSource;
-                receiver.close();
-                receiver = new Receiver(getClientSession(), s, target, source,
-                        org.apache.qpid.amqp_1_0.client.AcknowledgeMode.ALO, false);
-
-            }
-            receiver.close();
-        }
-        catch(ConnectionErrorException  e)
-        {
-            if(e.getRemoteError().getCondition() == AmqpError.NOT_FOUND)
-            {
-                throw new InvalidDestinationException(s);
-            }
-            else
-            {
-                JMSException jmsException = new JMSException(e.getMessage());
-                jmsException.setLinkedException(e);
-                throw jmsException;
-            }
-        }
-
-        //TODO
-    }
-
-    void stop()
-    {
-        //TODO
-    }
-
-    void start()
-    {
-        _dispatcher.start();
-        for(MessageConsumerImpl consumer : _consumers)
-        {
-            consumer.start();
-        }
-    }
-
-    org.apache.qpid.amqp_1_0.client.Session getClientSession()
-    {
-        return _session;
-    }
-
-    public MessageFactory getMessageFactory()
-    {
-        return _messageFactory;
-    }
-
-    void acknowledgeAll() throws IllegalStateException
-    {
-        synchronized(_session.getEndpoint().getLock())
-        {
-            checkClosed();
-            for(MessageConsumerImpl consumer : _consumers)
-            {
-                consumer.acknowledgeAll();
-            }
-        }
-    }
-
-    void messageListenerSet(final MessageConsumerImpl messageConsumer)
-    {
-        _dispatcher.updateMessageListener(messageConsumer);
-    }
-
-    public void messageArrived(final MessageConsumerImpl messageConsumer)
-    {
-        _dispatcher.messageArrivedAtConsumer(messageConsumer);
-    }
-
-    MessageImpl convertMessage(final javax.jms.Message message) throws JMSException
-    {
-        MessageImpl replacementMessage;
-
-        if(message instanceof BytesMessage)
-        {
-            replacementMessage = convertBytesMessage((BytesMessage) message);
-        }
-        else
-        {
-            if(message instanceof MapMessage)
-            {
-                replacementMessage = convertMapMessage((MapMessage) message);
-            }
-            else
-            {
-                if(message instanceof ObjectMessage)
-                {
-                    replacementMessage = convertObjectMessage((ObjectMessage) message);
-                }
-                else
-                {
-                    if(message instanceof StreamMessage)
-                    {
-                        replacementMessage = convertStreamMessage((StreamMessage) message);
-                    }
-                    else
-                    {
-                        if(message instanceof TextMessage)
-                        {
-                            replacementMessage = convertTextMessage((TextMessage) message);
-                        }
-                        else
-                        {
-                            replacementMessage = createMessage();
-                        }
-                    }
-                }
-            }
-        }
-
-        convertMessageProperties(message, replacementMessage);
-
-        return replacementMessage;
-    }
-
-
-    private void convertMessageProperties(final javax.jms.Message message, final MessageImpl replacementMessage)
-            throws JMSException
-    {
-        Enumeration propertyNames = message.getPropertyNames();
-        while (propertyNames.hasMoreElements())
-        {
-            String propertyName = String.valueOf(propertyNames.nextElement());
-            // TODO: Shouldn't need to check for JMS properties here as don't think getPropertyNames() should return them
-            if (!propertyName.startsWith("JMSX_"))
-            {
-                Object value = message.getObjectProperty(propertyName);
-                replacementMessage.setObjectProperty(propertyName, value);
-            }
-        }
-
-
-        replacementMessage.setJMSDeliveryMode(message.getJMSDeliveryMode());
-
-        if (message.getJMSReplyTo() != null)
-        {
-            replacementMessage.setJMSReplyTo(message.getJMSReplyTo());
-        }
-
-        replacementMessage.setJMSType(message.getJMSType());
-
-        replacementMessage.setJMSCorrelationID(message.getJMSCorrelationID());
-    }
-
-    private MessageImpl convertMapMessage(final MapMessage message) throws JMSException
-    {
-        MapMessageImpl mapMessage = createMapMessage();
-
-        Enumeration mapNames = message.getMapNames();
-        while (mapNames.hasMoreElements())
-        {
-            String name = (String) mapNames.nextElement();
-            mapMessage.setObject(name, message.getObject(name));
-        }
-
-        return mapMessage;
-    }
-
-    private MessageImpl convertBytesMessage(final BytesMessage message) throws JMSException
-    {
-        BytesMessageImpl bytesMessage = createBytesMessage();
-
-        message.reset();
-
-        byte[] buf = new byte[1024];
-
-        int len;
-
-        while ((len = message.readBytes(buf)) != -1)
-        {
-            bytesMessage.writeBytes(buf, 0, len);
-        }
-
-        return bytesMessage;
-    }
-
-    private MessageImpl convertObjectMessage(final ObjectMessage message) throws JMSException
-    {
-        ObjectMessageImpl objectMessage = createObjectMessage();
-        objectMessage.setObject(message.getObject());
-        return objectMessage;
-    }
-
-    private MessageImpl convertStreamMessage(final StreamMessage message) throws JMSException
-    {
-        StreamMessageImpl streamMessage = createStreamMessage();
-
-        try
-        {
-            message.reset();
-            while (true)
-            {
-                streamMessage.writeObject(message.readObject());
-            }
-        }
-        catch (MessageEOFException e)
-        {
-            // we're at the end so don't mind the exception
-        }
-
-        return streamMessage;
-    }
-
-    private MessageImpl convertTextMessage(final TextMessage message) throws JMSException
-    {
-        return createTextMessage(message.getText());
-    }
-
-    ConnectionImpl getConnection()
-    {
-        return _connection;
-    }
-
-    Transaction getTxn()
-    {
-        return _txn;
-    }
-
-    private class Dispatcher implements Runnable
-    {
-
-        private final List<MessageConsumerImpl> _messageConsumerList = new ArrayList<MessageConsumerImpl>();
-
-        private boolean _closed;
-        private boolean _started;
-
-        private Message _recoveredMessage;
-        private MessageConsumerImpl _recoveredConsumer;
-        private MessageConsumerImpl _currentConsumer;
-        private Message _currentMessage;
-
-        public void run()
-        {
-            synchronized(getLock())
-            {
-                while(!_closed)
-                {
-                    while(!_closed && (!_started || (_recoveredMessage == null && _messageConsumerList.isEmpty())))
-                    {
-                        try
-                        {
-                            getLock().wait();
-                        }
-                        catch (InterruptedException e)
-                        {
-                            return;
-                        }
-                    }
-                    while(!_closed && (_started && (_recoveredMessage != null || !_messageConsumerList.isEmpty())))
-                    {
-                        Message msg;
-
-                        MessageConsumerImpl consumer;
-
-                        boolean recoveredMessage =  _recoveredMessage != null;
-                        if(recoveredMessage)
-                        {
-                            consumer = _recoveredConsumer;
-                            msg = _recoveredMessage;
-                            _recoveredMessage = null;
-                            _recoveredConsumer = null;
-                        }
-                        else
-                        {
-                            consumer = _messageConsumerList.remove(0);
-                            msg = consumer.receive0(0L);
-                        }
-
-                        MessageListener listener = consumer._messageListener;
-
-                        MessageImpl message = consumer.createJMSMessage(msg, recoveredMessage);
-
-                        if(message != null)
-                        {
-                            if(_acknowledgeMode == AcknowledgeMode.CLIENT_ACKNOWLEDGE)
-                            {
-                                consumer.setLastUnackedMessage(msg.getDeliveryTag());
-                            }
-                            _currentConsumer = consumer;
-                            _currentMessage = msg;
-                            try
-                            {
-                                listener.onMessage(message);
-                            }
-                            finally
-                            {
-                                _currentConsumer = null;
-                                _currentMessage = null;
-                            }
-
-                            if(_recoveredMessage == null)
-                            {
-                                consumer.preReceiveAction(msg);
-                            }
-
-                        }
-
-                    }
-                    Iterator<MessageConsumerImpl> consumers = _consumers.iterator();
-                    while(consumers.hasNext())
-                    {
-                        MessageConsumerImpl consumer = consumers.next();
-                        try
-                        {
-                            consumer.checkReceiverError();
-                        }
-                        catch (JMSException e)
-                        {
-
-                            consumers.remove();
-                            try
-                            {
-                                _connection.getExceptionListener().onException(e);
-                                consumer.close();
-                            }
-                            catch (JMSException e1)
-                            {
-                            }
-                        }
-                    }
-
-                }
-            }
-        }
-
-        private Object getLock()
-        {
-            return _session.getEndpoint().getLock();
-        }
-
-        public void messageArrivedAtConsumer(MessageConsumerImpl impl)
-        {
-            synchronized (getLock())
-            {
-                _messageConsumerList.add(impl);
-                getLock().notifyAll();
-            }
-        }
-
-        public void close()
-        {
-            synchronized (getLock())
-            {
-                _closed = true;
-                getLock().notifyAll();
-            }
-        }
-
-        public void updateMessageListener(final MessageConsumerImpl messageConsumer)
-        {
-            synchronized (getLock())
-            {
-                getLock().notifyAll();
-            }
-        }
-
-        public void start()
-        {
-            synchronized (getLock())
-            {
-                _started = true;
-                getLock().notifyAll();
-            }
-        }
-
-        public void stop()
-        {
-            synchronized (getLock())
-            {
-                _started = false;
-                getLock().notifyAll();
-            }
-        }
-
-        public void doRecover()
-        {
-            _recoveredConsumer = _currentConsumer;
-            _recoveredMessage = _currentMessage;
-        }
-    }
-
-    void setQueueSession(final boolean queueSession)
-    {
-        _isQueueSession = queueSession;
-    }
-
-    void setTopicSession(final boolean topicSession)
-    {
-        _isTopicSession = topicSession;
-    }
-
-    String toAddress(DestinationImpl dest)
-    {
-        return _connection.toDecodedDestination(dest).getAddress();
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageListener;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import org.apache.qpid.amqp_1_0.client.Connection;
+import org.apache.qpid.amqp_1_0.client.ConnectionClosedException;
+import org.apache.qpid.amqp_1_0.client.ConnectionErrorException;
+import org.apache.qpid.amqp_1_0.client.ConnectionException;
+import org.apache.qpid.amqp_1_0.client.Message;
+import org.apache.qpid.amqp_1_0.client.Receiver;
+import org.apache.qpid.amqp_1_0.client.Sender;
+import org.apache.qpid.amqp_1_0.client.Transaction;
+import org.apache.qpid.amqp_1_0.jms.QueueReceiver;
+import org.apache.qpid.amqp_1_0.jms.QueueSender;
+import org.apache.qpid.amqp_1_0.jms.QueueSession;
+import org.apache.qpid.amqp_1_0.jms.Session;
+import org.apache.qpid.amqp_1_0.jms.TemporaryDestination;
+import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
+import org.apache.qpid.amqp_1_0.jms.TopicSession;
+import org.apache.qpid.amqp_1_0.jms.TopicSubscriber;
+import org.apache.qpid.amqp_1_0.transport.SessionEventListener;
+import org.apache.qpid.amqp_1_0.type.messaging.Source;
+import org.apache.qpid.amqp_1_0.type.messaging.Target;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
+
+public class SessionImpl implements Session, QueueSession, TopicSession
+{
+    private ConnectionImpl _connection;
+    private AcknowledgeMode _acknowledgeMode;
+    private org.apache.qpid.amqp_1_0.client.Session _session;
+    private MessageFactory _messageFactory;
+    private List<MessageConsumerImpl> _consumers = new ArrayList<MessageConsumerImpl>();
+    private List<MessageProducerImpl> _producers = new ArrayList<MessageProducerImpl>();
+
+    private MessageListener _messageListener;
+    private Dispatcher _dispatcher = new Dispatcher();
+    private Thread _dispatcherThread;
+
+    private boolean _closed;
+
+    private boolean _isQueueSession;
+    private boolean _isTopicSession;
+    private Transaction _txn;
+
+    protected SessionImpl(final ConnectionImpl connection, final AcknowledgeMode acknowledgeMode) throws JMSException
+    {
+        _connection = connection;
+        _acknowledgeMode = acknowledgeMode;
+        Connection clientConn = _connection.getClientConnection();
+        try
+        {
+            _session = clientConn.createSession();
+        }
+        catch (ConnectionException e)
+        {
+            final JMSException jmsException = new JMSException(e.getMessage());
+            jmsException.setLinkedException(e);
+            throw jmsException;
+        }
+        _session.getEndpoint().setSessionEventListener(new SessionEventListener.DefaultSessionEventListener()
+        {
+            @Override
+            public void remoteEnd(End end)
+            {
+                if(!_closed)
+                {
+                    try
+                    {
+                        close();
+                    }
+                    catch (JMSException e)
+                    {
+                    }
+                    try
+                    {
+                        final Error error = end.getError();
+                        final ExceptionListener exceptionListener = _connection.getExceptionListener();
+                        if(exceptionListener != null)
+                        {
+                            if(error != null)
+                            {
+                                exceptionListener.onException(new JMSException(error.getDescription(),
+                                        error.getCondition().getValue().toString()));
+                            }
+                            else
+                            {
+                                exceptionListener.onException(new JMSException("Session remotely closed"));
+                            }
+                        }
+                    }
+                    catch (JMSException e)
+                    {
+
+                    }
+
+                }
+            }
+        });
+        if(_acknowledgeMode == AcknowledgeMode.SESSION_TRANSACTED)
+        {
+            _txn = _session.createSessionLocalTransaction();
+        }
+
+        _messageFactory = new MessageFactory(this);
+
+    }
+
+    public BytesMessageImpl createBytesMessage() throws IllegalStateException
+    {
+        checkClosed();
+        return new BytesMessageImpl(this);
+
+    }
+
+    public MapMessageImpl createMapMessage() throws JMSException
+    {
+        checkClosed();
+        return new MapMessageImpl(this);
+    }
+
+    public MessageImpl createMessage() throws IllegalStateException
+    {
+        return createAmqpMessage();
+    }
+
+    public ObjectMessageImpl createObjectMessage() throws JMSException
+    {
+        checkClosed();
+        return new ObjectMessageImpl(this);
+    }
+
+    public ObjectMessageImpl createObjectMessage(final Serializable serializable) throws JMSException
+    {
+        checkClosed();
+        ObjectMessageImpl msg = new ObjectMessageImpl(this);
+        msg.setObject(serializable);
+        return msg;
+    }
+
+    public StreamMessageImpl createStreamMessage() throws JMSException
+    {
+        checkClosed();
+        return new StreamMessageImpl(this);
+    }
+
+    public TextMessageImpl createTextMessage() throws JMSException
+    {
+        return createTextMessage("");
+    }
+
+    public TextMessageImpl createTextMessage(final String s) throws JMSException
+    {
+        checkClosed();
+        TextMessageImpl msg = new TextMessageImpl(this);
+        msg.setText(s);
+        return msg;
+    }
+
+    public AmqpMessageImpl createAmqpMessage() throws IllegalStateException
+    {
+        checkClosed();
+        return new AmqpMessageImpl(this);
+    }
+
+    public boolean getTransacted() throws JMSException
+    {
+        checkClosed();
+        return _acknowledgeMode == AcknowledgeMode.SESSION_TRANSACTED;
+    }
+
+    public int getAcknowledgeMode() throws IllegalStateException
+    {
+        checkClosed();
+        return _acknowledgeMode.ordinal();
+    }
+
+    AcknowledgeMode getAckModeEnum()
+    {
+        return _acknowledgeMode;
+    }
+
+    public void commit() throws JMSException
+    {
+        checkClosed();
+        checkTransactional();
+
+        _txn.commit();
+        for(MessageConsumerImpl consumer : _consumers)
+        {
+            consumer.postCommit();
+        }
+
+        _txn = _session.createSessionLocalTransaction();
+        //TODO
+    }
+
+    public void rollback() throws JMSException
+    {
+        checkClosed();
+        checkTransactional();
+
+        _txn.rollback();
+
+        for(MessageConsumerImpl consumer : _consumers)
+        {
+            consumer.postRollback();
+        }
+
+        _txn = _session.createSessionLocalTransaction();
+
+        //TODO
+    }
+
+    private void checkTransactional() throws JMSException
+    {
+        if(!getTransacted())
+        {
+            throw new IllegalStateException("Session must be transacted in order to perform this operation");
+        }
+    }
+
+    public void close() throws JMSException
+    {
+        if(!_closed)
+        {
+            _closed = true;
+            _dispatcher.close();
+            for(MessageConsumerImpl consumer : _consumers)
+            {
+                consumer.close();
+            }
+            for(MessageProducerImpl producer : _producers)
+            {
+                producer.close();
+            }
+            _session.close();
+            _connection.removeSession(this);
+        }
+    }
+
+    private void checkClosed() throws IllegalStateException
+    {
+        if(_closed)
+        {
+            throw new IllegalStateException("Closed");
+        }
+    }
+
+    public void recover() throws JMSException
+    {
+        checkClosed();
+        checkNotTransactional();
+
+        if(_acknowledgeMode == AcknowledgeMode.CLIENT_ACKNOWLEDGE)
+        {
+            synchronized(_session.getEndpoint().getLock())
+            {
+                for(MessageConsumerImpl consumer : _consumers)
+                {
+                    consumer.doRecover();
+                }
+            }
+        }
+        else
+        {
+            if(Thread.currentThread() == _dispatcherThread)
+            {
+                _dispatcher.doRecover();
+            }
+        }
+
+    }
+
+    private void checkNotTransactional() throws JMSException
+    {
+
+        if(getTransacted())
+        {
+            throw new IllegalStateException("This operation cannot be carried out on a transacted session");
+        }
+    }
+
+    public MessageListener getMessageListener() throws JMSException
+    {
+        return _messageListener;
+    }
+
+    public void setMessageListener(final MessageListener messageListener) throws JMSException
+    {
+        if(_messageListener != null)
+        {
+            // TODO
+        }
+        else
+        {
+            _messageListener = messageListener;
+        }
+    }
+
+    public void run()
+    {
+        //TODO
+    }
+
+    public MessageProducerImpl createProducer(final Destination destination) throws JMSException
+    {
+        checkClosed();
+
+        final MessageProducerImpl messageProducer = new MessageProducerImpl(destination, this);
+
+        _producers.add(messageProducer);
+
+        return messageProducer;
+    }
+
+    public MessageConsumerImpl createConsumer(final Destination destination) throws JMSException
+    {
+        checkClosed();
+        return createConsumer(destination, null, false);
+    }
+
+    public MessageConsumerImpl createConsumer(final Destination destination, final String selector) throws JMSException
+    {
+        checkClosed();
+        return createConsumer(destination, selector, false);
+    }
+
+    public MessageConsumerImpl createConsumer(final Destination destination, final String selector, final boolean noLocal)
+            throws JMSException
+    {
+        checkClosed();
+        checkValidDestination(destination);
+        if(destination instanceof TemporaryDestination)
+        {
+            TemporaryDestination temporaryDestination = (TemporaryDestination) destination;
+            if(temporaryDestination.getSession() != this)
+            {
+                throw new JMSException("Cannot consume from a temporary destination created on another session");
+            }
+            if(temporaryDestination.isDeleted())
+            {
+                throw new IllegalStateException("Destination is deleted");
+            }
+        }
+        final MessageConsumerImpl messageConsumer;
+        synchronized(_session.getEndpoint().getLock())
+        {
+            if(_dispatcherThread == null)
+            {
+                _dispatcherThread = new Thread(_dispatcher);
+                _dispatcherThread.start();
+            }
+
+            messageConsumer = new MessageConsumerImpl(destination, this, selector, noLocal);
+            addConsumer(messageConsumer);
+            if(_connection.isStarted())
+            {
+                messageConsumer.start();
+            }
+        }
+        return messageConsumer;
+    }
+
+    private void checkValidDestination(Destination destination) throws InvalidDestinationException
+    {
+        if (destination == null || !(destination instanceof DestinationImpl))
+        {
+            throw new InvalidDestinationException("Invalid Destination");
+        }
+    }
+
+
+    protected void addConsumer(final MessageConsumerImpl messageConsumer)
+    {
+        _consumers.add(messageConsumer);
+    }
+
+    public QueueImpl createQueue(final String s) throws JMSException
+    {
+        checkClosed();
+        checkNotTopicSession();
+        return new QueueImpl(s);
+    }
+
+    public QueueReceiver createReceiver(final Queue queue) throws JMSException
+    {
+        checkClosed();
+        checkNotTopicSession();
+        return createConsumer(queue);
+    }
+
+    public QueueReceiver createReceiver(final Queue queue, final String selector) throws JMSException
+    {
+        checkClosed();
+        checkNotTopicSession();
+        return createConsumer(queue, selector);
+    }
+
+    public QueueSender createSender(final Queue queue) throws JMSException
+    {
+        checkClosed();
+        checkNotTopicSession();
+        return createProducer(queue);
+    }
+
+    public TopicImpl createTopic(final String s) throws JMSException
+    {
+        checkClosed();
+        checkNotQueueSession();
+        return new TopicImpl(s);
+    }
+
+    public TopicSubscriber createSubscriber(final Topic topic) throws JMSException
+    {
+        checkClosed();
+        checkNotQueueSession();
+        return createConsumer(topic);
+    }
+
+    public TopicSubscriber createSubscriber(final Topic topic, final String selector, final boolean noLocal) throws JMSException
+    {
+        checkClosed();
+        checkNotQueueSession();
+        return createConsumer(topic, selector, noLocal);
+    }
+
+    public TopicSubscriberImpl createDurableSubscriber(final Topic topic, final String name) throws JMSException
+    {
+        checkClosed();
+        checkNotQueueSession();
+        return createDurableSubscriber(topic, name, null, false);
+    }
+
+    private void checkNotQueueSession() throws IllegalStateException
+    {
+        if(_isQueueSession)
+        {
+            throw new IllegalStateException("Cannot perform this operation on a QueueSession");
+        }
+    }
+
+
+    private void checkNotTopicSession() throws IllegalStateException
+    {
+        if(_isTopicSession)
+        {
+            throw new IllegalStateException("Cannot perform this operation on a TopicSession");
+        }
+    }
+
+    public TopicSubscriberImpl createDurableSubscriber(final Topic topic, final String name, final String selector, final boolean noLocal)
+            throws JMSException
+    {
+        checkClosed();
+        checkNotQueueSession();
+        if(!(topic instanceof TopicImpl))
+        {
+            throw new InvalidDestinationException("invalid destination " + topic);
+        }
+        final TopicSubscriberImpl messageConsumer;
+        synchronized(_session.getEndpoint().getLock())
+        {
+            messageConsumer = new TopicSubscriberImpl(name, true, (org.apache.qpid.amqp_1_0.jms.Topic) topic, this,
+                                                      selector,
+                                                      noLocal);
+            addConsumer(messageConsumer);
+            if(_connection.isStarted())
+            {
+                messageConsumer.start();
+            }
+        }
+        return messageConsumer;
+    }
+
+    public TopicPublisher createPublisher(final Topic topic) throws JMSException
+    {
+        checkClosed();
+        checkNotQueueSession();
+        return createProducer(topic);
+    }
+
+    public QueueBrowserImpl createBrowser(final Queue queue) throws JMSException
+    {
+        checkClosed();
+        checkNotTopicSession();
+        checkValidDestination(queue);
+        return createBrowser(queue, null);
+    }
+
+    public QueueBrowserImpl createBrowser(final Queue queue, final String selector) throws JMSException
+    {
+        checkClosed();
+        checkNotTopicSession();
+        checkValidDestination(queue);
+
+        return new QueueBrowserImpl((QueueImpl) queue, selector, this);
+
+    }
+
+    public TemporaryQueueImpl createTemporaryQueue() throws JMSException
+    {
+        checkClosed();
+        checkNotTopicSession();
+        try
+        {
+            Sender send = _session.createTemporaryQueueSender();
+
+            TemporaryQueueImpl tempQ = new TemporaryQueueImpl(((Target)send.getTarget()).getAddress(), send, this);
+            return tempQ;
+        }
+        catch (Sender.SenderCreationException e)
+        {
+            throw new JMSException("Unable to create temporary queue");
+        }
+        catch (ConnectionClosedException e)
+        {
+            throw new JMSException("Unable to create temporary queue");
+        }
+    }
+
+    public TemporaryTopicImpl createTemporaryTopic() throws JMSException
+    {
+        checkClosed();
+        checkNotQueueSession();
+        try
+        {
+            Sender send = _session.createTemporaryQueueSender();
+
+            TemporaryTopicImpl tempQ = new TemporaryTopicImpl(((Target)send.getTarget()).getAddress(), send, this);
+            return tempQ;
+        }
+        catch (Sender.SenderCreationException e)
+        {
+            throw new JMSException("Unable to create temporary queue");
+        }
+        catch (ConnectionClosedException e)
+        {
+            throw new JMSException("Unable to create temporary queue");
+        }
+    }
+
+    public void unsubscribe(final String s) throws JMSException
+    {
+        checkClosed();
+
+        checkNotQueueSession();
+
+        Target target = new Target();
+        target.setAddress(UUID.randomUUID().toString());
+
+        try
+        {
+            Receiver receiver = new Receiver(getClientSession(), s, target, null,
+                                             org.apache.qpid.amqp_1_0.client.AcknowledgeMode.ALO, false);
+
+            final org.apache.qpid.amqp_1_0.type.Source receiverSource = receiver.getSource();
+            if(receiverSource instanceof Source)
+            {
+                Source source = (Source) receiverSource;
+                receiver.close();
+                receiver = new Receiver(getClientSession(), s, target, source,
+                        org.apache.qpid.amqp_1_0.client.AcknowledgeMode.ALO, false);
+
+            }
+            receiver.close();
+        }
+        catch(ConnectionErrorException  e)
+        {
+            if(e.getRemoteError().getCondition() == AmqpError.NOT_FOUND)
+            {
+                throw new InvalidDestinationException(s);
+            }
+            else
+            {
+                JMSException jmsException = new JMSException(e.getMessage());
+                jmsException.setLinkedException(e);
+                throw jmsException;
+            }
+        }
+
+        //TODO
+    }
+
+    void stop()
+    {
+        //TODO
+    }
+
+    void start()
+    {
+        _dispatcher.start();
+        for(MessageConsumerImpl consumer : _consumers)
+        {
+            consumer.start();
+        }
+    }
+
+    org.apache.qpid.amqp_1_0.client.Session getClientSession()
+    {
+        return _session;
+    }
+
+    public MessageFactory getMessageFactory()
+    {
+        return _messageFactory;
+    }
+
+    void acknowledgeAll() throws IllegalStateException
+    {
+        synchronized(_session.getEndpoint().getLock())
+        {
+            checkClosed();
+            for(MessageConsumerImpl consumer : _consumers)
+            {
+                consumer.acknowledgeAll();
+            }
+        }
+    }
+
+    void messageListenerSet(final MessageConsumerImpl messageConsumer)
+    {
+        _dispatcher.updateMessageListener(messageConsumer);
+    }
+
+    public void messageArrived(final MessageConsumerImpl messageConsumer)
+    {
+        _dispatcher.messageArrivedAtConsumer(messageConsumer);
+    }
+
+    MessageImpl convertMessage(final javax.jms.Message message) throws JMSException
+    {
+        MessageImpl replacementMessage;
+
+        if(message instanceof BytesMessage)
+        {
+            replacementMessage = convertBytesMessage((BytesMessage) message);
+        }
+        else
+        {
+            if(message instanceof MapMessage)
+            {
+                replacementMessage = convertMapMessage((MapMessage) message);
+            }
+            else
+            {
+                if(message instanceof ObjectMessage)
+                {
+                    replacementMessage = convertObjectMessage((ObjectMessage) message);
+                }
+                else
+                {
+                    if(message instanceof StreamMessage)
+                    {
+                        replacementMessage = convertStreamMessage((StreamMessage) message);
+                    }
+                    else
+                    {
+                        if(message instanceof TextMessage)
+                        {
+                            replacementMessage = convertTextMessage((TextMessage) message);
+                        }
+                        else
+                        {
+                            replacementMessage = createMessage();
+                        }
+                    }
+                }
+            }
+        }
+
+        convertMessageProperties(message, replacementMessage);
+
+        return replacementMessage;
+    }
+
+
+    private void convertMessageProperties(final javax.jms.Message message, final MessageImpl replacementMessage)
+            throws JMSException
+    {
+        Enumeration propertyNames = message.getPropertyNames();
+        while (propertyNames.hasMoreElements())
+        {
+            String propertyName = String.valueOf(propertyNames.nextElement());
+            // TODO: Shouldn't need to check for JMS properties here as don't think getPropertyNames() should return them
+            if (!propertyName.startsWith("JMSX_"))
+            {
+                Object value = message.getObjectProperty(propertyName);
+                replacementMessage.setObjectProperty(propertyName, value);
+            }
+        }
+
+
+        replacementMessage.setJMSDeliveryMode(message.getJMSDeliveryMode());
+
+        if (message.getJMSReplyTo() != null)
+        {
+            replacementMessage.setJMSReplyTo(message.getJMSReplyTo());
+        }
+
+        replacementMessage.setJMSType(message.getJMSType());
+
+        replacementMessage.setJMSCorrelationID(message.getJMSCorrelationID());
+    }
+
+    private MessageImpl convertMapMessage(final MapMessage message) throws JMSException
+    {
+        MapMessageImpl mapMessage = createMapMessage();
+
+        Enumeration mapNames = message.getMapNames();
+        while (mapNames.hasMoreElements())
+        {
+            String name = (String) mapNames.nextElement();
+            mapMessage.setObject(name, message.getObject(name));
+        }
+
+        return mapMessage;
+    }
+
+    private MessageImpl convertBytesMessage(final BytesMessage message) throws JMSException
+    {
+        BytesMessageImpl bytesMessage = createBytesMessage();
+
+        message.reset();
+
+        byte[] buf = new byte[1024];
+
+        int len;
+
+        while ((len = message.readBytes(buf)) != -1)
+        {
+            bytesMessage.writeBytes(buf, 0, len);
+        }
+
+        return bytesMessage;
+    }
+
+    private MessageImpl convertObjectMessage(final ObjectMessage message) throws JMSException
+    {
+        ObjectMessageImpl objectMessage = createObjectMessage();
+        objectMessage.setObject(message.getObject());
+        return objectMessage;
+    }
+
+    private MessageImpl convertStreamMessage(final StreamMessage message) throws JMSException
+    {
+        StreamMessageImpl streamMessage = createStreamMessage();
+
+        try
+        {
+            message.reset();
+            while (true)
+            {
+                streamMessage.writeObject(message.readObject());
+            }
+        }
+        catch (MessageEOFException e)
+        {
+            // we're at the end so don't mind the exception
+        }
+
+        return streamMessage;
+    }
+
+    private MessageImpl convertTextMessage(final TextMessage message) throws JMSException
+    {
+        return createTextMessage(message.getText());
+    }
+
+    ConnectionImpl getConnection()
+    {
+        return _connection;
+    }
+
+    Transaction getTxn()
+    {
+        return _txn;
+    }
+
+    private class Dispatcher implements Runnable
+    {
+
+        private final List<MessageConsumerImpl> _messageConsumerList = new ArrayList<MessageConsumerImpl>();
+
+        private boolean _closed;
+        private boolean _started;
+
+        private Message _recoveredMessage;
+        private MessageConsumerImpl _recoveredConsumer;
+        private MessageConsumerImpl _currentConsumer;
+        private Message _currentMessage;
+
+        public void run()
+        {
+            synchronized(getLock())
+            {
+                while(!_closed)
+                {
+                    while(!_closed && (!_started || (_recoveredMessage == null && _messageConsumerList.isEmpty())))
+                    {
+                        try
+                        {
+                            getLock().wait();
+                        }
+                        catch (InterruptedException e)
+                        {
+                            return;
+                        }
+                    }
+                    while(!_closed && (_started && (_recoveredMessage != null || !_messageConsumerList.isEmpty())))
+                    {
+                        Message msg;
+
+                        MessageConsumerImpl consumer;
+
+                        boolean recoveredMessage =  _recoveredMessage != null;
+                        if(recoveredMessage)
+                        {
+                            consumer = _recoveredConsumer;
+                            msg = _recoveredMessage;
+                            _recoveredMessage = null;
+                            _recoveredConsumer = null;
+                        }
+                        else
+                        {
+                            consumer = _messageConsumerList.remove(0);
+                            msg = consumer.receive0(0L);
+                        }
+
+                        MessageListener listener = consumer._messageListener;
+
+                        MessageImpl message = consumer.createJMSMessage(msg, recoveredMessage);
+
+                        if(message != null)
+                        {
+                            if(_acknowledgeMode == AcknowledgeMode.CLIENT_ACKNOWLEDGE)
+                            {
+                                consumer.setLastUnackedMessage(msg.getDeliveryTag());
+                            }
+                            _currentConsumer = consumer;
+                            _currentMessage = msg;
+                            try
+                            {
+                                listener.onMessage(message);
+                            }
+                            finally
+                            {
+                                _currentConsumer = null;
+                                _currentMessage = null;
+                            }
+
+                            if(_recoveredMessage == null)
+                            {
+                                consumer.preReceiveAction(msg);
+                            }
+
+                        }
+
+                    }
+                    Iterator<MessageConsumerImpl> consumers = _consumers.iterator();
+                    while(consumers.hasNext())
+                    {
+                        MessageConsumerImpl consumer = consumers.next();
+                        try
+                        {
+                            consumer.checkReceiverError();
+                        }
+                        catch (JMSException e)
+                        {
+
+                            consumers.remove();
+                            try
+                            {
+                                _connection.getExceptionListener().onException(e);
+                                consumer.close();
+                            }
+                            catch (JMSException e1)
+                            {
+                            }
+                        }
+                    }
+
+                }
+            }
+        }
+
+        private Object getLock()
+        {
+            return _session.getEndpoint().getLock();
+        }
+
+        public void messageArrivedAtConsumer(MessageConsumerImpl impl)
+        {
+            synchronized (getLock())
+            {
+                _messageConsumerList.add(impl);
+                getLock().notifyAll();
+            }
+        }
+
+        public void close()
+        {
+            synchronized (getLock())
+            {
+                _closed = true;
+                getLock().notifyAll();
+            }
+        }
+
+        public void updateMessageListener(final MessageConsumerImpl messageConsumer)
+        {
+            synchronized (getLock())
+            {
+                getLock().notifyAll();
+            }
+        }
+
+        public void start()
+        {
+            synchronized (getLock())
+            {
+                _started = true;
+                getLock().notifyAll();
+            }
+        }
+
+        public void stop()
+        {
+            synchronized (getLock())
+            {
+                _started = false;
+                getLock().notifyAll();
+            }
+        }
+
+        public void doRecover()
+        {
+            _recoveredConsumer = _currentConsumer;
+            _recoveredMessage = _currentMessage;
+        }
+    }
+
+    void setQueueSession(final boolean queueSession)
+    {
+        _isQueueSession = queueSession;
+    }
+
+    void setTopicSession(final boolean topicSession)
+    {
+        _isTopicSession = topicSession;
+    }
+
+    String toAddress(DestinationImpl dest)
+    {
+        return _connection.toDecodedDestination(dest).getAddress();
+    }
+
+}

Propchange: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org