You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2013/09/25 16:33:07 UTC

svn commit: r1526190 [5/7] - in /qpid/trunk/qpid/java/amqp-1-0-client-jms: example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/ src/main/java/org/apache/qpid/amqp_1_0/jms/ src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ src/main/java/org/apache...

Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java?rev=1526190&r1=1526189&r2=1526190&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java Wed Sep 25 14:33:06 2013
@@ -1,489 +1,489 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.amqp_1_0.jms.impl;
-
-import org.apache.qpid.amqp_1_0.client.ConnectionClosedException;
-import org.apache.qpid.amqp_1_0.client.LinkDetachedException;
-import org.apache.qpid.amqp_1_0.client.Sender;
-import org.apache.qpid.amqp_1_0.jms.MessageProducer;
-import org.apache.qpid.amqp_1_0.jms.MessageRejectedException;
-import org.apache.qpid.amqp_1_0.jms.QueueSender;
-import org.apache.qpid.amqp_1_0.jms.TemporaryDestination;
-import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.Outcome;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-
-import javax.jms.*;
-import javax.jms.IllegalStateException;
-import java.util.UUID;
-import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
-import org.apache.qpid.amqp_1_0.type.transport.*;
-
-public class MessageProducerImpl implements MessageProducer, QueueSender, TopicPublisher
-{
-    private boolean _disableMessageID;
-    private boolean _disableMessageTimestamp;
-    private int _deliveryMode = Message.DEFAULT_DELIVERY_MODE;
-    private int _priority = Message.DEFAULT_PRIORITY;
-    private long _timeToLive;
-
-    private DestinationImpl _destination;
-    private SessionImpl _session;
-    private Sender _sender;
-    private boolean _closed;
-    private boolean _syncPublish = Boolean.getBoolean("qpid.sync_publish");
-    private long _syncPublishTimeout = Long.getLong("qpid.sync_publish_timeout", 30000l);
-
-    protected MessageProducerImpl(final Destination destination,
-                               final SessionImpl session) throws JMSException
-    {
-        if(destination instanceof DestinationImpl)
-        {
-            _destination = (DestinationImpl) destination;
-        }
-        else if(destination != null)
-        {
-            throw new InvalidDestinationException("Invalid Destination Class" + destination.getClass().getName());
-        }
-
-        _session = session;
-
-        if(_destination != null)
-        {
-            try
-            {
-                _sender = _session.getClientSession().createSender(_session.toAddress(_destination));
-            }
-            catch (Sender.SenderCreationException e)
-            {
-                // TODO - refine exception
-                JMSException jmsEx = new JMSException(e.getMessage());
-                jmsEx.initCause(e);
-                jmsEx.setLinkedException(e);
-                throw jmsEx;
-            }
-            catch (ConnectionClosedException e)
-            {
-
-                // TODO - refine exception
-                JMSException jmsEx = new JMSException(e.getMessage());
-                jmsEx.initCause(e);
-                jmsEx.setLinkedException(e);
-                throw jmsEx;
-            }
-            _sender.setRemoteErrorListener(new Runnable()
-                    {
-                        @Override
-                        public void run()
-                        {
-                            try
-                            {
-                                final ExceptionListener exceptionListener = _session.getConnection().getExceptionListener();
-
-                                if(exceptionListener != null)
-                                {
-                                    final org.apache.qpid.amqp_1_0.type.transport.Error receiverError = _sender.getError();
-                                    exceptionListener.onException(new JMSException(receiverError.getDescription(),
-                                            receiverError.getCondition().getValue().toString()));
-
-                                }
-                            }
-                            catch (JMSException e)
-                            {
-
-                            }
-                        }
-                    });
-        }
-    }
-
-    private void checkClosed() throws IllegalStateException
-    {
-        if(_closed)
-        {
-            throw new javax.jms.IllegalStateException("Producer closed");
-        }
-    }
-
-    public boolean getDisableMessageID() throws IllegalStateException
-    {
-        checkClosed();
-        return _disableMessageID;
-    }
-
-    public void setDisableMessageID(final boolean disableMessageID) throws IllegalStateException
-    {
-        checkClosed();
-        _disableMessageID = disableMessageID;
-    }
-
-    public boolean getDisableMessageTimestamp() throws IllegalStateException
-    {
-        checkClosed();
-        return _disableMessageTimestamp;
-    }
-
-    public void setDisableMessageTimestamp(final boolean disableMessageTimestamp) throws IllegalStateException
-    {
-        checkClosed();
-        _disableMessageTimestamp = disableMessageTimestamp;
-    }
-
-    public int getDeliveryMode() throws IllegalStateException
-    {
-        checkClosed();
-        return _deliveryMode;
-    }
-
-    public void setDeliveryMode(final int deliveryMode) throws IllegalStateException
-    {
-        checkClosed();
-        _deliveryMode = deliveryMode;
-    }
-
-    public int getPriority() throws IllegalStateException
-    {
-        checkClosed();
-        return _priority;
-    }
-
-    public void setPriority(final int priority) throws IllegalStateException
-    {
-        checkClosed();
-        _priority = priority;
-    }
-
-    public long getTimeToLive() throws IllegalStateException
-    {
-        checkClosed();
-        return _timeToLive;
-    }
-
-    public void setTimeToLive(final long timeToLive) throws IllegalStateException
-    {
-        checkClosed();
-        _timeToLive = timeToLive;
-    }
-
-    public DestinationImpl getDestination() throws JMSException
-    {
-        checkClosed();
-        return _destination;
-    }
-
-    public void close() throws JMSException
-    {
-        try
-        {
-            if(!_closed)
-            {
-                _closed = true;
-                if(_sender != null)
-                {
-                    _sender.close();
-                }
-            }
-
-        }
-        catch (Sender.SenderClosingException e)
-        {
-            final JMSException jmsException = new JMSException("error closing");
-            jmsException.setLinkedException(e);
-            throw jmsException;
-        }
-    }
-
-    public void send(final Message message) throws JMSException
-    {
-        send(message, getDeliveryMode(), getPriority(), getTimeToLive());
-    }
-
-    public void send(final Message message, final int deliveryMode, final int priority, final long ttl) throws JMSException
-    {
-        if(_sender == null)
-        {
-            throw new UnsupportedOperationException("No Destination provided");
-        }
-        if(_destination instanceof TemporaryDestination && ((TemporaryDestination)_destination).isDeleted())
-        {
-            throw new IllegalStateException("Destination is deleted");
-        }
-
-
-        //TODO
-        MessageImpl msg;
-        if(message instanceof org.apache.qpid.amqp_1_0.jms.Message)
-        {
-            msg = (MessageImpl) message;
-        }
-        else
-        {
-            msg = _session.convertMessage(message);
-        }
-
-
-
-        msg.setJMSDeliveryMode(deliveryMode);
-        msg.setJMSPriority(priority);
-
-        msg.setJMSDestination(_destination);
-
-        long timestamp = 0l;
-
-        if(!getDisableMessageTimestamp() || ttl != 0)
-        {
-            timestamp = System.currentTimeMillis();
-            msg.setJMSTimestamp(timestamp);
-
-        }
-        if(ttl != 0)
-        {
-            msg.setTtl(UnsignedInteger.valueOf(ttl));
-        }
-        else
-        {
-            msg.setTtl(null);
-        }
-
-        if(!getDisableMessageID() && msg.getMessageId() == null)
-        {
-            final Object messageId = generateMessageId();
-            msg.setMessageId(messageId);
-
-        }
-
-        if(message != msg)
-        {
-            message.setJMSTimestamp(msg.getJMSTimestamp());
-            message.setJMSMessageID(msg.getJMSMessageID());
-            message.setJMSDeliveryMode(msg.getJMSDeliveryMode());
-            message.setJMSPriority(msg.getJMSPriority());
-            message.setJMSExpiration(msg.getJMSExpiration());
-        }
-
-
-        final org.apache.qpid.amqp_1_0.client.Message clientMessage = new org.apache.qpid.amqp_1_0.client.Message(msg.getSections());
-
-        DispositionAction action = null;
-
-        if(_syncPublish)
-        {
-            action = new DispositionAction(_sender);
-        }
-
-        try
-        {
-            _sender.send(clientMessage, _session.getTxn(), action);
-        }
-        catch (LinkDetachedException e)
-        {
-            JMSException jmsException = new InvalidDestinationException("Sender has been closed");
-            jmsException.setLinkedException(e);
-            throw jmsException;
-        }
-
-        if(_syncPublish && !action.wasAccepted(_syncPublishTimeout + System.currentTimeMillis()))
-        {
-            throw new MessageRejectedException("Message was rejected");
-        }
-
-        if(getDestination() != null)
-        {
-            message.setJMSDestination(getDestination());
-        }
-    }
-
-    public void send(final javax.jms.Queue queue, final Message message) throws JMSException
-    {
-        send((Destination)queue, message);
-    }
-
-    public void send(final javax.jms.Queue queue, final Message message, final int deliveryMode, final int priority, final long ttl)
-            throws JMSException
-    {
-        send((Destination)queue, message, deliveryMode, priority, ttl);
-    }
-
-    private Object generateMessageId()
-    {
-        UUID uuid = UUID.randomUUID();
-        final String messageIdString = uuid.toString();
-        return _session.getConnection().useBinaryMessageId() ? new Binary(messageIdString.getBytes()) : messageIdString;
-    }
-
-    public void send(final Destination destination, final Message message) throws JMSException
-    {
-        send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive());
-    }
-
-    public void send(final Destination destination, final Message message, final int deliveryMode, final int priority, final long ttl)
-            throws JMSException
-    {
-
-        checkClosed();
-        if(destination == null)
-        {
-            send(message, deliveryMode, priority, ttl);
-        }
-        else
-        {
-            if(_destination != null)
-            {
-                throw new UnsupportedOperationException("Cannot use explicit destination pon non-anonymous producer");
-            }
-            else if(!(destination instanceof DestinationImpl))
-            {
-                throw new InvalidDestinationException("Invalid Destination Class" + destination.getClass().getName());
-            }
-            else if(destination instanceof TemporaryDestination && ((TemporaryDestination)destination).isDeleted())
-            {
-                throw new IllegalStateException("Destination has been deleted");
-            }
-            try
-            {
-                _destination = (DestinationImpl) destination;
-                _sender = _session.getClientSession().createSender(_session.toAddress(_destination));
-
-                send(message, deliveryMode, priority, ttl);
-
-                _sender.close();
-
-
-
-            }
-            catch (Sender.SenderCreationException e)
-            {
-                // TODO - refine exception
-                JMSException jmsEx = new JMSException(e.getMessage());
-                jmsEx.initCause(e);
-                jmsEx.setLinkedException(e);
-                throw jmsEx;
-            }
-            catch (Sender.SenderClosingException e)
-            {
-                JMSException jmsEx = new JMSException(e.getMessage());
-                jmsEx.initCause(e);
-                jmsEx.setLinkedException(e);
-                throw jmsEx;
-            }
-            catch (ConnectionClosedException e)
-            {
-
-                JMSException jmsEx = new JMSException(e.getMessage());
-                jmsEx.initCause(e);
-                jmsEx.setLinkedException(e);
-                throw jmsEx;
-            }
-            finally
-            {
-                _sender = null;
-                _destination = null;
-            }
-        }
-    }
-
-    public QueueImpl getQueue() throws JMSException
-    {
-        return (QueueImpl) getDestination();
-    }
-
-    public TopicImpl getTopic() throws JMSException
-    {
-        return (TopicImpl) getDestination();
-    }
-
-    public void publish(final Message message) throws JMSException
-    {
-        send(message);
-    }
-
-    public void publish(final Message message, final int deliveryMode, final int priority, final long ttl) throws JMSException
-    {
-        send(message, deliveryMode, priority, ttl);
-    }
-
-    public void publish(final Topic topic, final Message message) throws JMSException
-    {
-        send(topic, message);
-    }
-
-    public void publish(final Topic topic, final Message message, final int deliveryMode, final int priority, final long ttl)
-            throws JMSException
-    {
-        send(topic, message, deliveryMode, priority, ttl);
-    }
-
-    private static class DispositionAction implements Sender.OutcomeAction
-    {
-        private final Sender _sender;
-        private final Object _lock;
-        private Outcome _outcome;
-
-        public DispositionAction(Sender sender)
-        {
-            _sender = sender;
-            _lock = sender.getEndpoint().getLock();
-        }
-
-        @Override
-        public void onOutcome(Binary deliveryTag, Outcome outcome)
-        {
-            synchronized (_lock)
-            {
-                _outcome = outcome;
-                _lock.notifyAll();
-            }
-        }
-
-        public boolean wasAccepted(long timeout) throws JMSException
-        {
-            synchronized(_lock)
-            {
-                while(_outcome == null && !_sender.getEndpoint().isDetached())
-                {
-                    try
-                    {
-                        _lock.wait(timeout - System.currentTimeMillis());
-                    }
-                    catch (InterruptedException e)
-                    {
-                        Thread.currentThread().interrupt();
-                    }
-                }
-                if(_outcome == null)
-                {
-
-                    if(_sender.getEndpoint().isDetached())
-                    {
-                        throw new JMSException("Link was detached");
-                    }
-                    else
-                    {
-                        throw new JMSException("Timed out waiting for message acceptance");
-                    }
-                }
-                else
-                {
-                    return _outcome instanceof Accepted;
-                }
-            }
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.client.ConnectionClosedException;
+import org.apache.qpid.amqp_1_0.client.LinkDetachedException;
+import org.apache.qpid.amqp_1_0.client.Sender;
+import org.apache.qpid.amqp_1_0.jms.MessageProducer;
+import org.apache.qpid.amqp_1_0.jms.MessageRejectedException;
+import org.apache.qpid.amqp_1_0.jms.QueueSender;
+import org.apache.qpid.amqp_1_0.jms.TemporaryDestination;
+import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Outcome;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+import java.util.UUID;
+import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+
+public class MessageProducerImpl implements MessageProducer, QueueSender, TopicPublisher
+{
+    private boolean _disableMessageID;
+    private boolean _disableMessageTimestamp;
+    private int _deliveryMode = Message.DEFAULT_DELIVERY_MODE;
+    private int _priority = Message.DEFAULT_PRIORITY;
+    private long _timeToLive;
+
+    private DestinationImpl _destination;
+    private SessionImpl _session;
+    private Sender _sender;
+    private boolean _closed;
+    private boolean _syncPublish = Boolean.getBoolean("qpid.sync_publish");
+    private long _syncPublishTimeout = Long.getLong("qpid.sync_publish_timeout", 30000l);
+
+    protected MessageProducerImpl(final Destination destination,
+                               final SessionImpl session) throws JMSException
+    {
+        if(destination instanceof DestinationImpl)
+        {
+            _destination = (DestinationImpl) destination;
+        }
+        else if(destination != null)
+        {
+            throw new InvalidDestinationException("Invalid Destination Class" + destination.getClass().getName());
+        }
+
+        _session = session;
+
+        if(_destination != null)
+        {
+            try
+            {
+                _sender = _session.getClientSession().createSender(_session.toAddress(_destination));
+            }
+            catch (Sender.SenderCreationException e)
+            {
+                // TODO - refine exception
+                JMSException jmsEx = new JMSException(e.getMessage());
+                jmsEx.initCause(e);
+                jmsEx.setLinkedException(e);
+                throw jmsEx;
+            }
+            catch (ConnectionClosedException e)
+            {
+
+                // TODO - refine exception
+                JMSException jmsEx = new JMSException(e.getMessage());
+                jmsEx.initCause(e);
+                jmsEx.setLinkedException(e);
+                throw jmsEx;
+            }
+            _sender.setRemoteErrorListener(new Runnable()
+                    {
+                        @Override
+                        public void run()
+                        {
+                            try
+                            {
+                                final ExceptionListener exceptionListener = _session.getConnection().getExceptionListener();
+
+                                if(exceptionListener != null)
+                                {
+                                    final org.apache.qpid.amqp_1_0.type.transport.Error receiverError = _sender.getError();
+                                    exceptionListener.onException(new JMSException(receiverError.getDescription(),
+                                            receiverError.getCondition().getValue().toString()));
+
+                                }
+                            }
+                            catch (JMSException e)
+                            {
+
+                            }
+                        }
+                    });
+        }
+    }
+
+    private void checkClosed() throws IllegalStateException
+    {
+        if(_closed)
+        {
+            throw new javax.jms.IllegalStateException("Producer closed");
+        }
+    }
+
+    public boolean getDisableMessageID() throws IllegalStateException
+    {
+        checkClosed();
+        return _disableMessageID;
+    }
+
+    public void setDisableMessageID(final boolean disableMessageID) throws IllegalStateException
+    {
+        checkClosed();
+        _disableMessageID = disableMessageID;
+    }
+
+    public boolean getDisableMessageTimestamp() throws IllegalStateException
+    {
+        checkClosed();
+        return _disableMessageTimestamp;
+    }
+
+    public void setDisableMessageTimestamp(final boolean disableMessageTimestamp) throws IllegalStateException
+    {
+        checkClosed();
+        _disableMessageTimestamp = disableMessageTimestamp;
+    }
+
+    public int getDeliveryMode() throws IllegalStateException
+    {
+        checkClosed();
+        return _deliveryMode;
+    }
+
+    public void setDeliveryMode(final int deliveryMode) throws IllegalStateException
+    {
+        checkClosed();
+        _deliveryMode = deliveryMode;
+    }
+
+    public int getPriority() throws IllegalStateException
+    {
+        checkClosed();
+        return _priority;
+    }
+
+    public void setPriority(final int priority) throws IllegalStateException
+    {
+        checkClosed();
+        _priority = priority;
+    }
+
+    public long getTimeToLive() throws IllegalStateException
+    {
+        checkClosed();
+        return _timeToLive;
+    }
+
+    public void setTimeToLive(final long timeToLive) throws IllegalStateException
+    {
+        checkClosed();
+        _timeToLive = timeToLive;
+    }
+
+    public DestinationImpl getDestination() throws JMSException
+    {
+        checkClosed();
+        return _destination;
+    }
+
+    public void close() throws JMSException
+    {
+        try
+        {
+            if(!_closed)
+            {
+                _closed = true;
+                if(_sender != null)
+                {
+                    _sender.close();
+                }
+            }
+
+        }
+        catch (Sender.SenderClosingException e)
+        {
+            final JMSException jmsException = new JMSException("error closing");
+            jmsException.setLinkedException(e);
+            throw jmsException;
+        }
+    }
+
+    public void send(final Message message) throws JMSException
+    {
+        send(message, getDeliveryMode(), getPriority(), getTimeToLive());
+    }
+
+    public void send(final Message message, final int deliveryMode, final int priority, final long ttl) throws JMSException
+    {
+        if(_sender == null)
+        {
+            throw new UnsupportedOperationException("No Destination provided");
+        }
+        if(_destination instanceof TemporaryDestination && ((TemporaryDestination)_destination).isDeleted())
+        {
+            throw new IllegalStateException("Destination is deleted");
+        }
+
+
+        //TODO
+        MessageImpl msg;
+        if(message instanceof org.apache.qpid.amqp_1_0.jms.Message)
+        {
+            msg = (MessageImpl) message;
+        }
+        else
+        {
+            msg = _session.convertMessage(message);
+        }
+
+
+
+        msg.setJMSDeliveryMode(deliveryMode);
+        msg.setJMSPriority(priority);
+
+        msg.setJMSDestination(_destination);
+
+        long timestamp = 0l;
+
+        if(!getDisableMessageTimestamp() || ttl != 0)
+        {
+            timestamp = System.currentTimeMillis();
+            msg.setJMSTimestamp(timestamp);
+
+        }
+        if(ttl != 0)
+        {
+            msg.setTtl(UnsignedInteger.valueOf(ttl));
+        }
+        else
+        {
+            msg.setTtl(null);
+        }
+
+        if(!getDisableMessageID() && msg.getMessageId() == null)
+        {
+            final Object messageId = generateMessageId();
+            msg.setMessageId(messageId);
+
+        }
+
+        if(message != msg)
+        {
+            message.setJMSTimestamp(msg.getJMSTimestamp());
+            message.setJMSMessageID(msg.getJMSMessageID());
+            message.setJMSDeliveryMode(msg.getJMSDeliveryMode());
+            message.setJMSPriority(msg.getJMSPriority());
+            message.setJMSExpiration(msg.getJMSExpiration());
+        }
+
+
+        final org.apache.qpid.amqp_1_0.client.Message clientMessage = new org.apache.qpid.amqp_1_0.client.Message(msg.getSections());
+
+        DispositionAction action = null;
+
+        if(_syncPublish)
+        {
+            action = new DispositionAction(_sender);
+        }
+
+        try
+        {
+            _sender.send(clientMessage, _session.getTxn(), action);
+        }
+        catch (LinkDetachedException e)
+        {
+            JMSException jmsException = new InvalidDestinationException("Sender has been closed");
+            jmsException.setLinkedException(e);
+            throw jmsException;
+        }
+
+        if(_syncPublish && !action.wasAccepted(_syncPublishTimeout + System.currentTimeMillis()))
+        {
+            throw new MessageRejectedException("Message was rejected");
+        }
+
+        if(getDestination() != null)
+        {
+            message.setJMSDestination(getDestination());
+        }
+    }
+
+    public void send(final javax.jms.Queue queue, final Message message) throws JMSException
+    {
+        send((Destination)queue, message);
+    }
+
+    public void send(final javax.jms.Queue queue, final Message message, final int deliveryMode, final int priority, final long ttl)
+            throws JMSException
+    {
+        send((Destination)queue, message, deliveryMode, priority, ttl);
+    }
+
+    private Object generateMessageId()
+    {
+        UUID uuid = UUID.randomUUID();
+        final String messageIdString = uuid.toString();
+        return _session.getConnection().useBinaryMessageId() ? new Binary(messageIdString.getBytes()) : messageIdString;
+    }
+
+    public void send(final Destination destination, final Message message) throws JMSException
+    {
+        send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive());
+    }
+
+    public void send(final Destination destination, final Message message, final int deliveryMode, final int priority, final long ttl)
+            throws JMSException
+    {
+
+        checkClosed();
+        if(destination == null)
+        {
+            send(message, deliveryMode, priority, ttl);
+        }
+        else
+        {
+            if(_destination != null)
+            {
+                throw new UnsupportedOperationException("Cannot use explicit destination pon non-anonymous producer");
+            }
+            else if(!(destination instanceof DestinationImpl))
+            {
+                throw new InvalidDestinationException("Invalid Destination Class" + destination.getClass().getName());
+            }
+            else if(destination instanceof TemporaryDestination && ((TemporaryDestination)destination).isDeleted())
+            {
+                throw new IllegalStateException("Destination has been deleted");
+            }
+            try
+            {
+                _destination = (DestinationImpl) destination;
+                _sender = _session.getClientSession().createSender(_session.toAddress(_destination));
+
+                send(message, deliveryMode, priority, ttl);
+
+                _sender.close();
+
+
+
+            }
+            catch (Sender.SenderCreationException e)
+            {
+                // TODO - refine exception
+                JMSException jmsEx = new JMSException(e.getMessage());
+                jmsEx.initCause(e);
+                jmsEx.setLinkedException(e);
+                throw jmsEx;
+            }
+            catch (Sender.SenderClosingException e)
+            {
+                JMSException jmsEx = new JMSException(e.getMessage());
+                jmsEx.initCause(e);
+                jmsEx.setLinkedException(e);
+                throw jmsEx;
+            }
+            catch (ConnectionClosedException e)
+            {
+
+                JMSException jmsEx = new JMSException(e.getMessage());
+                jmsEx.initCause(e);
+                jmsEx.setLinkedException(e);
+                throw jmsEx;
+            }
+            finally
+            {
+                _sender = null;
+                _destination = null;
+            }
+        }
+    }
+
+    public QueueImpl getQueue() throws JMSException
+    {
+        return (QueueImpl) getDestination();
+    }
+
+    public TopicImpl getTopic() throws JMSException
+    {
+        return (TopicImpl) getDestination();
+    }
+
+    public void publish(final Message message) throws JMSException
+    {
+        send(message);
+    }
+
+    public void publish(final Message message, final int deliveryMode, final int priority, final long ttl) throws JMSException
+    {
+        send(message, deliveryMode, priority, ttl);
+    }
+
+    public void publish(final Topic topic, final Message message) throws JMSException
+    {
+        send(topic, message);
+    }
+
+    public void publish(final Topic topic, final Message message, final int deliveryMode, final int priority, final long ttl)
+            throws JMSException
+    {
+        send(topic, message, deliveryMode, priority, ttl);
+    }
+
+    private static class DispositionAction implements Sender.OutcomeAction
+    {
+        private final Sender _sender;
+        private final Object _lock;
+        private Outcome _outcome;
+
+        public DispositionAction(Sender sender)
+        {
+            _sender = sender;
+            _lock = sender.getEndpoint().getLock();
+        }
+
+        @Override
+        public void onOutcome(Binary deliveryTag, Outcome outcome)
+        {
+            synchronized (_lock)
+            {
+                _outcome = outcome;
+                _lock.notifyAll();
+            }
+        }
+
+        public boolean wasAccepted(long timeout) throws JMSException
+        {
+            synchronized(_lock)
+            {
+                while(_outcome == null && !_sender.getEndpoint().isDetached())
+                {
+                    try
+                    {
+                        _lock.wait(timeout - System.currentTimeMillis());
+                    }
+                    catch (InterruptedException e)
+                    {
+                        Thread.currentThread().interrupt();
+                    }
+                }
+                if(_outcome == null)
+                {
+
+                    if(_sender.getEndpoint().isDetached())
+                    {
+                        throw new JMSException("Link was detached");
+                    }
+                    else
+                    {
+                        throw new JMSException("Timed out waiting for message acceptance");
+                    }
+                }
+                else
+                {
+                    return _outcome instanceof Accepted;
+                }
+            }
+        }
+    }
+}

Propchange: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java?rev=1526190&r1=1526189&r2=1526190&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java Wed Sep 25 14:33:06 2013
@@ -1,161 +1,161 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.amqp_1_0.jms.impl;
-
-import org.apache.qpid.amqp_1_0.jms.ObjectMessage;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.Symbol;
-import org.apache.qpid.amqp_1_0.type.messaging.*;
-import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-
-import javax.jms.JMSException;
-import javax.jms.MessageNotWriteableException;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.util.*;
-
-public class ObjectMessageImpl extends MessageImpl implements ObjectMessage
-{
-    static final Symbol CONTENT_TYPE = Symbol.valueOf("application/x-java-serialized-object");
-
-    static final Data NULL_OBJECT_DATA;
-    static 
-    {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        try
-        {
-            ObjectOutputStream oos = new ObjectOutputStream(baos);
-            oos.writeObject(null);
-            oos.flush();
-            oos.close();
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-        NULL_OBJECT_DATA = new Data(new Binary(baos.toByteArray()));
-    }
-
-    private Data _objectData = NULL_OBJECT_DATA;
-
-    protected ObjectMessageImpl(Header header,
-                                MessageAnnotations messageAnnotations,
-                                Properties properties,
-                                ApplicationProperties appProperties,
-                                Data dataSection,
-                                Footer footer,
-                                SessionImpl session)
-    {
-        super(header, messageAnnotations, properties, appProperties, footer, session);
-        getProperties().setContentType(CONTENT_TYPE);
-        Serializable serializable = null;
-        _objectData = dataSection;
-
-    }
-
-    protected ObjectMessageImpl(final SessionImpl session)
-    {
-        super(new Header(), new MessageAnnotations(new HashMap()),
-              new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
-              session);
-        getProperties().setContentType(CONTENT_TYPE);
-    }
-
-    public void setObject(final Serializable serializable) throws MessageNotWriteableException
-    {
-        checkWritable();
-
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        try
-        {
-            ObjectOutputStream oos = new ObjectOutputStream(baos);
-            oos.writeObject(serializable);
-            oos.flush();
-            oos.close();
-
-            _objectData = new Data(new Binary(baos.toByteArray()));
-
-        }
-        catch (IOException e)
-        {
-            e.printStackTrace();  //TODO
-        }
-    }
-
-    public Serializable getObject() throws JMSException
-    {
-
-        if(_objectData == null)
-        {
-            return null;
-        }
-
-        Binary data = _objectData.getValue();
-
-        try
-        {
-            ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data.getArray(), data.getArrayOffset(), data.getLength()));
-            return (Serializable) ois.readObject();
-        }
-        catch (IOException e)
-        {
-            JMSException jmsException = new JMSException(e.getMessage());
-            jmsException.setLinkedException(e);
-            throw jmsException;
-        }
-        catch (ClassNotFoundException e)
-        {
-
-            JMSException jmsException = new JMSException(e.getMessage());
-            jmsException.setLinkedException(e);
-            throw jmsException;
-        }
-
-    }
-
-    @Override
-    public void clearBody() throws JMSException
-    {
-        super.clearBody();
-        _objectData = null;
-    }
-
-    @Override Collection<Section> getSections()
-    {
-        List<Section> sections = new ArrayList<Section>();
-        sections.add(getHeader());
-        if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty())
-        {
-            sections.add(getMessageAnnotations());
-        }
-        sections.add(getProperties());
-        sections.add(getApplicationProperties());
-
-        sections.add(_objectData);
-
-        sections.add(getFooter());
-        return sections;
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.ObjectMessage;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+
+import javax.jms.JMSException;
+import javax.jms.MessageNotWriteableException;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.*;
+
+public class ObjectMessageImpl extends MessageImpl implements ObjectMessage
+{
+    static final Symbol CONTENT_TYPE = Symbol.valueOf("application/x-java-serialized-object");
+
+    static final Data NULL_OBJECT_DATA;
+    static 
+    {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try
+        {
+            ObjectOutputStream oos = new ObjectOutputStream(baos);
+            oos.writeObject(null);
+            oos.flush();
+            oos.close();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+        NULL_OBJECT_DATA = new Data(new Binary(baos.toByteArray()));
+    }
+
+    private Data _objectData = NULL_OBJECT_DATA;
+
+    protected ObjectMessageImpl(Header header,
+                                MessageAnnotations messageAnnotations,
+                                Properties properties,
+                                ApplicationProperties appProperties,
+                                Data dataSection,
+                                Footer footer,
+                                SessionImpl session)
+    {
+        super(header, messageAnnotations, properties, appProperties, footer, session);
+        getProperties().setContentType(CONTENT_TYPE);
+        Serializable serializable = null;
+        _objectData = dataSection;
+
+    }
+
+    protected ObjectMessageImpl(final SessionImpl session)
+    {
+        super(new Header(), new MessageAnnotations(new HashMap()),
+              new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
+              session);
+        getProperties().setContentType(CONTENT_TYPE);
+    }
+
+    public void setObject(final Serializable serializable) throws MessageNotWriteableException
+    {
+        checkWritable();
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try
+        {
+            ObjectOutputStream oos = new ObjectOutputStream(baos);
+            oos.writeObject(serializable);
+            oos.flush();
+            oos.close();
+
+            _objectData = new Data(new Binary(baos.toByteArray()));
+
+        }
+        catch (IOException e)
+        {
+            e.printStackTrace();  //TODO
+        }
+    }
+
+    public Serializable getObject() throws JMSException
+    {
+
+        if(_objectData == null)
+        {
+            return null;
+        }
+
+        Binary data = _objectData.getValue();
+
+        try
+        {
+            ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data.getArray(), data.getArrayOffset(), data.getLength()));
+            return (Serializable) ois.readObject();
+        }
+        catch (IOException e)
+        {
+            JMSException jmsException = new JMSException(e.getMessage());
+            jmsException.setLinkedException(e);
+            throw jmsException;
+        }
+        catch (ClassNotFoundException e)
+        {
+
+            JMSException jmsException = new JMSException(e.getMessage());
+            jmsException.setLinkedException(e);
+            throw jmsException;
+        }
+
+    }
+
+    @Override
+    public void clearBody() throws JMSException
+    {
+        super.clearBody();
+        _objectData = null;
+    }
+
+    @Override Collection<Section> getSections()
+    {
+        List<Section> sections = new ArrayList<Section>();
+        sections.add(getHeader());
+        if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty())
+        {
+            sections.add(getMessageAnnotations());
+        }
+        sections.add(getProperties());
+        sections.add(getApplicationProperties());
+
+        sections.add(_objectData);
+
+        sections.add(getFooter());
+        return sections;
+    }
+}

Propchange: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java?rev=1526190&r1=1526189&r2=1526190&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java Wed Sep 25 14:33:06 2013
@@ -1,202 +1,202 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.amqp_1_0.jms.impl;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-import org.apache.qpid.amqp_1_0.client.AcknowledgeMode;
-import org.apache.qpid.amqp_1_0.client.ConnectionErrorException;
-import org.apache.qpid.amqp_1_0.client.Message;
-import org.apache.qpid.amqp_1_0.client.Receiver;
-import org.apache.qpid.amqp_1_0.jms.QueueBrowser;
-import org.apache.qpid.amqp_1_0.type.Symbol;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.messaging.Filter;
-import org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter;
-import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
-import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
-
-public class QueueBrowserImpl implements QueueBrowser
-{
-    private static final String JMS_SELECTOR = "jms-selector";
-    private QueueImpl _queue;
-    private String _selector;
-    private final SessionImpl _session;
-    private Map<Symbol, Filter> _filters;
-    private HashSet<MessageEnumeration> _enumerations = new HashSet<MessageEnumeration>();
-    private boolean _closed;
-
-    QueueBrowserImpl(final QueueImpl queue, final String selector, SessionImpl session) throws JMSException
-    {
-        _queue = queue;
-        _selector = selector;
-        _session = session;
-
-
-        if(selector == null || selector.trim().equals(""))
-        {
-            _filters = null;
-        }
-        else
-        {
-            _filters = Collections.singletonMap(Symbol.valueOf(JMS_SELECTOR),(Filter) new JMSSelectorFilter(_selector));
-            // We do this just to have the server validate the filter..
-            new MessageEnumeration().close();
-        }
-    }
-
-    public QueueImpl getQueue()
-    {
-        return _queue;
-    }
-
-    public String getMessageSelector()
-    {
-        return _selector;
-    }
-
-    public Enumeration getEnumeration() throws JMSException
-    {
-        if(_closed)
-        {
-            throw new IllegalStateException("Browser has been closed");
-        }
-        return new MessageEnumeration();
-    }
-
-    public void close() throws JMSException
-    {
-        _closed = true;
-        for(MessageEnumeration me : new ArrayList<MessageEnumeration>(_enumerations))
-        {
-            me.close();
-        }
-    }
-
-    private final class MessageEnumeration implements Enumeration<MessageImpl>
-    {
-        private Receiver _receiver;
-        private MessageImpl _nextElement;
-        private boolean _needNext = true;
-
-        MessageEnumeration() throws JMSException
-        {
-            try
-            {
-                _receiver = _session.getClientSession().createReceiver(_session.toAddress(_queue),
-                        StdDistMode.COPY,
-                        AcknowledgeMode.AMO, null,
-                        false,
-                        _filters, null);
-                _receiver.setCredit(UnsignedInteger.valueOf(100), true);
-            }
-            catch(ConnectionErrorException e)
-            {
-                org.apache.qpid.amqp_1_0.type.transport.Error error = e.getRemoteError();
-                if(AmqpError.INVALID_FIELD.equals(error.getCondition()))
-                {
-                    throw new InvalidSelectorException(e.getMessage());
-                }
-                else
-                {
-                    throw new JMSException(e.getMessage(), error.getCondition().getValue().toString());
-                }
-
-            }
-            _enumerations.add(this);
-
-        }
-
-        public void close()
-        {
-            _enumerations.remove(this);
-            _receiver.close();
-            _receiver = null;
-        }
-
-        @Override
-        public boolean hasMoreElements()
-        {
-            if( _receiver == null )
-            {
-                return false;
-            }
-            if( _needNext )
-            {
-                _needNext = false;
-                _nextElement = createJMSMessage(_receiver.receive(0L));
-                if( _nextElement == null )
-                {
-                    // Drain to verify there really are no more messages.
-                    _receiver.drain();
-                    _receiver.drainWait();
-                    _nextElement = createJMSMessage(_receiver.receive(0L));
-                    if( _nextElement == null )
-                    {
-                        close();
-                    }
-                    else
-                    {
-                        // there are still more messages, open up the credit window again..
-                        _receiver.clearDrain();
-                    }
-                }
-            }
-            return _nextElement != null;
-        }
-
-        @Override
-        public MessageImpl nextElement()
-        {
-            if( hasMoreElements() )
-            {
-                MessageImpl message = _nextElement;
-                _nextElement = null;
-                _needNext = true;
-                return message;
-            }
-            else
-            {
-                throw new NoSuchElementException();
-            }
-        }
-    }
-
-    MessageImpl createJMSMessage(final Message msg)
-    {
-        if(msg != null)
-        {
-            final MessageImpl message = _session.getMessageFactory().createMessage(_queue, msg);
-            message.setFromQueue(true);
-            message.setFromTopic(false);
-            return message;
-        }
-        else
-        {
-            return null;
-        }
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import org.apache.qpid.amqp_1_0.client.AcknowledgeMode;
+import org.apache.qpid.amqp_1_0.client.ConnectionErrorException;
+import org.apache.qpid.amqp_1_0.client.Message;
+import org.apache.qpid.amqp_1_0.client.Receiver;
+import org.apache.qpid.amqp_1_0.jms.QueueBrowser;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.messaging.Filter;
+import org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter;
+import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
+import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
+
+public class QueueBrowserImpl implements QueueBrowser
+{
+    private static final String JMS_SELECTOR = "jms-selector";
+    private QueueImpl _queue;
+    private String _selector;
+    private final SessionImpl _session;
+    private Map<Symbol, Filter> _filters;
+    private HashSet<MessageEnumeration> _enumerations = new HashSet<MessageEnumeration>();
+    private boolean _closed;
+
+    QueueBrowserImpl(final QueueImpl queue, final String selector, SessionImpl session) throws JMSException
+    {
+        _queue = queue;
+        _selector = selector;
+        _session = session;
+
+
+        if(selector == null || selector.trim().equals(""))
+        {
+            _filters = null;
+        }
+        else
+        {
+            _filters = Collections.singletonMap(Symbol.valueOf(JMS_SELECTOR),(Filter) new JMSSelectorFilter(_selector));
+            // We do this just to have the server validate the filter..
+            new MessageEnumeration().close();
+        }
+    }
+
+    public QueueImpl getQueue()
+    {
+        return _queue;
+    }
+
+    public String getMessageSelector()
+    {
+        return _selector;
+    }
+
+    public Enumeration getEnumeration() throws JMSException
+    {
+        if(_closed)
+        {
+            throw new IllegalStateException("Browser has been closed");
+        }
+        return new MessageEnumeration();
+    }
+
+    public void close() throws JMSException
+    {
+        _closed = true;
+        for(MessageEnumeration me : new ArrayList<MessageEnumeration>(_enumerations))
+        {
+            me.close();
+        }
+    }
+
+    private final class MessageEnumeration implements Enumeration<MessageImpl>
+    {
+        private Receiver _receiver;
+        private MessageImpl _nextElement;
+        private boolean _needNext = true;
+
+        MessageEnumeration() throws JMSException
+        {
+            try
+            {
+                _receiver = _session.getClientSession().createReceiver(_session.toAddress(_queue),
+                        StdDistMode.COPY,
+                        AcknowledgeMode.AMO, null,
+                        false,
+                        _filters, null);
+                _receiver.setCredit(UnsignedInteger.valueOf(100), true);
+            }
+            catch(ConnectionErrorException e)
+            {
+                org.apache.qpid.amqp_1_0.type.transport.Error error = e.getRemoteError();
+                if(AmqpError.INVALID_FIELD.equals(error.getCondition()))
+                {
+                    throw new InvalidSelectorException(e.getMessage());
+                }
+                else
+                {
+                    throw new JMSException(e.getMessage(), error.getCondition().getValue().toString());
+                }
+
+            }
+            _enumerations.add(this);
+
+        }
+
+        public void close()
+        {
+            _enumerations.remove(this);
+            _receiver.close();
+            _receiver = null;
+        }
+
+        @Override
+        public boolean hasMoreElements()
+        {
+            if( _receiver == null )
+            {
+                return false;
+            }
+            if( _needNext )
+            {
+                _needNext = false;
+                _nextElement = createJMSMessage(_receiver.receive(0L));
+                if( _nextElement == null )
+                {
+                    // Drain to verify there really are no more messages.
+                    _receiver.drain();
+                    _receiver.drainWait();
+                    _nextElement = createJMSMessage(_receiver.receive(0L));
+                    if( _nextElement == null )
+                    {
+                        close();
+                    }
+                    else
+                    {
+                        // there are still more messages, open up the credit window again..
+                        _receiver.clearDrain();
+                    }
+                }
+            }
+            return _nextElement != null;
+        }
+
+        @Override
+        public MessageImpl nextElement()
+        {
+            if( hasMoreElements() )
+            {
+                MessageImpl message = _nextElement;
+                _nextElement = null;
+                _needNext = true;
+                return message;
+            }
+            else
+            {
+                throw new NoSuchElementException();
+            }
+        }
+    }
+
+    MessageImpl createJMSMessage(final Message msg)
+    {
+        if(msg != null)
+        {
+            final MessageImpl message = _session.getMessageFactory().createMessage(_queue, msg);
+            message.setFromQueue(true);
+            message.setFromTopic(false);
+            return message;
+        }
+        else
+        {
+            return null;
+        }
+    }
+
+}

Propchange: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueConnectionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueConnectionImpl.java?rev=1526190&r1=1526189&r2=1526190&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueConnectionImpl.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueConnectionImpl.java Wed Sep 25 14:33:06 2013
@@ -1,48 +1,48 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.amqp_1_0.jms.impl;
-
-import org.apache.qpid.amqp_1_0.jms.QueueConnection;
-
-import javax.jms.ConnectionConsumer;
-import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.ServerSessionPool;
-
-public class QueueConnectionImpl extends ConnectionImpl implements QueueConnection
-{
-    QueueConnectionImpl(String host, int port, String username, String password, String clientId)
-            throws JMSException
-    {
-        super(host, port, username, password, clientId);
-    }
-
-    public QueueSessionImpl createQueueSession(final boolean b, final int i) throws JMSException
-    {
-        return null;  //TODO
-    }
-
-    public ConnectionConsumer createConnectionConsumer(final Queue queue,
-                                                       final String s,
-                                                       final ServerSessionPool serverSessionPool,
-                                                       final int i) throws JMSException
-    {
-        return null;  //TODO
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.QueueConnection;
+
+import javax.jms.ConnectionConsumer;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.ServerSessionPool;
+
+public class QueueConnectionImpl extends ConnectionImpl implements QueueConnection
+{
+    QueueConnectionImpl(String host, int port, String username, String password, String clientId)
+            throws JMSException
+    {
+        super(host, port, username, password, clientId);
+    }
+
+    public QueueSessionImpl createQueueSession(final boolean b, final int i) throws JMSException
+    {
+        return null;  //TODO
+    }
+
+    public ConnectionConsumer createConnectionConsumer(final Queue queue,
+                                                       final String s,
+                                                       final ServerSessionPool serverSessionPool,
+                                                       final int i) throws JMSException
+    {
+        return null;  //TODO
+    }
+}

Propchange: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueConnectionImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java?rev=1526190&r1=1526189&r2=1526190&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java Wed Sep 25 14:33:06 2013
@@ -1,56 +1,56 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.amqp_1_0.jms.impl;
-
-import org.apache.qpid.amqp_1_0.jms.Queue;
-
-import java.util.WeakHashMap;
-
-public class QueueImpl extends DestinationImpl implements Queue
-{
-    private static final WeakHashMap<String, QueueImpl> QUEUE_CACHE =
-        new WeakHashMap<String, QueueImpl>();
-
-    public QueueImpl(String address)
-    {
-        super(address);
-    }
-
-    public String getQueueName()
-    {
-        return getAddress();
-    }
-
-    public static synchronized QueueImpl createQueue(final String address)
-    {
-        QueueImpl queue = QUEUE_CACHE.get(address);
-        if(queue == null)
-        {
-            queue = new QueueImpl(address);
-            QUEUE_CACHE.put(address, queue);
-        }
-        return queue;
-    }
-
-    public static QueueImpl valueOf(String address)
-    {
-        return address == null ? null : createQueue(address);
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.Queue;
+
+import java.util.WeakHashMap;
+
+public class QueueImpl extends DestinationImpl implements Queue
+{
+    private static final WeakHashMap<String, QueueImpl> QUEUE_CACHE =
+        new WeakHashMap<String, QueueImpl>();
+
+    public QueueImpl(String address)
+    {
+        super(address);
+    }
+
+    public String getQueueName()
+    {
+        return getAddress();
+    }
+
+    public static synchronized QueueImpl createQueue(final String address)
+    {
+        QueueImpl queue = QUEUE_CACHE.get(address);
+        if(queue == null)
+        {
+            queue = new QueueImpl(address);
+            QUEUE_CACHE.put(address, queue);
+        }
+        return queue;
+    }
+
+    public static QueueImpl valueOf(String address)
+    {
+        return address == null ? null : createQueue(address);
+    }
+
+}

Propchange: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java?rev=1526190&r1=1526189&r2=1526190&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java Wed Sep 25 14:33:06 2013
@@ -1,56 +1,56 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.amqp_1_0.jms.impl;
-
-import javax.jms.JMSException;
-import org.apache.qpid.amqp_1_0.client.ConnectionErrorException;
-import org.apache.qpid.amqp_1_0.client.Receiver;
-import org.apache.qpid.amqp_1_0.jms.Queue;
-import org.apache.qpid.amqp_1_0.jms.QueueReceiver;
-
-public class QueueReceiverImpl extends MessageConsumerImpl implements QueueReceiver
-{
-    QueueReceiverImpl(final QueueImpl destination,
-                      final SessionImpl session,
-                      final String selector,
-                      final boolean noLocal)
-            throws JMSException
-    {
-        super(destination, session, selector, noLocal);
-        setQueueConsumer(true);
-    }
-
-    protected Receiver createClientReceiver() throws JMSException
-    {
-        try
-        {
-            return getSession().getClientSession().createMovingReceiver(getSession().toAddress(getDestination()));
-        }
-        catch (ConnectionErrorException e)
-        {
-            throw new JMSException(e.getMessage(), e.getRemoteError().getCondition().toString());
-        }
-    }
-
-    public Queue getQueue() throws JMSException
-    {
-        return (QueueImpl) getDestination();
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import javax.jms.JMSException;
+import org.apache.qpid.amqp_1_0.client.ConnectionErrorException;
+import org.apache.qpid.amqp_1_0.client.Receiver;
+import org.apache.qpid.amqp_1_0.jms.Queue;
+import org.apache.qpid.amqp_1_0.jms.QueueReceiver;
+
+public class QueueReceiverImpl extends MessageConsumerImpl implements QueueReceiver
+{
+    QueueReceiverImpl(final QueueImpl destination,
+                      final SessionImpl session,
+                      final String selector,
+                      final boolean noLocal)
+            throws JMSException
+    {
+        super(destination, session, selector, noLocal);
+        setQueueConsumer(true);
+    }
+
+    protected Receiver createClientReceiver() throws JMSException
+    {
+        try
+        {
+            return getSession().getClientSession().createMovingReceiver(getSession().toAddress(getDestination()));
+        }
+        catch (ConnectionErrorException e)
+        {
+            throw new JMSException(e.getMessage(), e.getRemoteError().getCondition().toString());
+        }
+    }
+
+    public Queue getQueue() throws JMSException
+    {
+        return (QueueImpl) getDestination();
+    }
+
+}

Propchange: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSenderImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSenderImpl.java?rev=1526190&r1=1526189&r2=1526190&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSenderImpl.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSenderImpl.java Wed Sep 25 14:33:06 2013
@@ -1,36 +1,36 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.amqp_1_0.jms.impl;
-
-import org.apache.qpid.amqp_1_0.jms.QueueSender;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-
-public class QueueSenderImpl extends MessageProducerImpl implements QueueSender
-{
-    protected QueueSenderImpl(final Destination destination, final SessionImpl session)
-            throws JMSException
-    {
-        super(destination, session);
-    }
-
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.QueueSender;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+public class QueueSenderImpl extends MessageProducerImpl implements QueueSender
+{
+    protected QueueSenderImpl(final Destination destination, final SessionImpl session)
+            throws JMSException
+    {
+        super(destination, session);
+    }
+
+
+}

Propchange: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSenderImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSessionImpl.java?rev=1526190&r1=1526189&r2=1526190&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSessionImpl.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSessionImpl.java Wed Sep 25 14:33:06 2013
@@ -1,57 +1,57 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.amqp_1_0.jms.impl;
-
-import org.apache.qpid.amqp_1_0.jms.QueueSession;
-
-import javax.jms.JMSException;
-import javax.jms.Queue;
-
-public class QueueSessionImpl extends SessionImpl implements QueueSession
-{
-    protected QueueSessionImpl(final ConnectionImpl connection, final AcknowledgeMode acknowledgeMode)
-            throws JMSException
-    {
-        super(connection, acknowledgeMode);
-        setQueueSession(true);
-    }
-
-    public QueueReceiverImpl createReceiver(final Queue queue) throws JMSException
-    {
-        return createReceiver(queue, null);
-    }
-
-    public QueueReceiverImpl createReceiver(final Queue queue, final String selector) throws JMSException
-    {
-        // TODO - assert queue is a queueimpl and throw relevant JMS Exception
-        final QueueReceiverImpl messageConsumer;
-        synchronized(getClientSession().getEndpoint().getLock())
-        {
-            messageConsumer = new QueueReceiverImpl((QueueImpl)queue, this, selector, false);
-            addConsumer(messageConsumer);
-        }
-        return messageConsumer;
-
-    }
-
-    public QueueSenderImpl createSender(final Queue queue) throws JMSException
-    {
-        return null;  //TODO
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.QueueSession;
+
+import javax.jms.JMSException;
+import javax.jms.Queue;
+
+public class QueueSessionImpl extends SessionImpl implements QueueSession
+{
+    protected QueueSessionImpl(final ConnectionImpl connection, final AcknowledgeMode acknowledgeMode)
+            throws JMSException
+    {
+        super(connection, acknowledgeMode);
+        setQueueSession(true);
+    }
+
+    public QueueReceiverImpl createReceiver(final Queue queue) throws JMSException
+    {
+        return createReceiver(queue, null);
+    }
+
+    public QueueReceiverImpl createReceiver(final Queue queue, final String selector) throws JMSException
+    {
+        // TODO - assert queue is a queueimpl and throw relevant JMS Exception
+        final QueueReceiverImpl messageConsumer;
+        synchronized(getClientSession().getEndpoint().getLock())
+        {
+            messageConsumer = new QueueReceiverImpl((QueueImpl)queue, this, selector, false);
+            addConsumer(messageConsumer);
+        }
+        return messageConsumer;
+
+    }
+
+    public QueueSenderImpl createSender(final Queue queue) throws JMSException
+    {
+        return null;  //TODO
+    }
+}

Propchange: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSessionImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org