You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2014/09/23 20:20:49 UTC

[25/27] Initial drop of donated AMQP Client Code.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageAvailableListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageAvailableListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageAvailableListener.java
new file mode 100644
index 0000000..880960f
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageAvailableListener.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.MessageConsumer;
+
+
+/**
+ * Internal JmsMessage available listener.
+ */
+public interface JmsMessageAvailableListener {
+
+    /**
+     * Called when a Message is available to be received by a client
+     *
+     * @param consumer
+     *        the MessageConsumer instance that has message available.
+     */
+    public void onMessageAvailable(MessageConsumer consumer);
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
new file mode 100644
index 0000000..07cba2a
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -0,0 +1,509 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.apache.qpid.jms.message.JmsMessage;
+import org.apache.qpid.jms.meta.JmsConsumerId;
+import org.apache.qpid.jms.meta.JmsConsumerInfo;
+import org.apache.qpid.jms.provider.Provider;
+import org.apache.qpid.jms.provider.ProviderFuture;
+import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
+import org.apache.qpid.jms.util.FifoMessageQueue;
+import org.apache.qpid.jms.util.MessageQueue;
+import org.apache.qpid.jms.util.PriorityMessageQueue;
+
+/**
+ * implementation of a JMS Message Consumer
+ */
+public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableConsumer, JmsMessageDispatcher {
+
+    protected final JmsSession session;
+    protected final JmsConnection connection;
+    protected JmsConsumerInfo consumerInfo;
+    protected final int acknowledgementMode;
+    protected final AtomicBoolean closed = new AtomicBoolean();
+    protected boolean started;
+    protected MessageListener messageListener;
+    protected JmsMessageAvailableListener availableListener;
+    protected final MessageQueue messageQueue;
+    protected final Lock lock = new ReentrantLock();
+    protected final AtomicBoolean suspendedConnection = new AtomicBoolean();
+    protected final AtomicBoolean delivered = new AtomicBoolean();
+
+    /**
+     * Create a non-durable MessageConsumer
+     *
+     * @param consumerId
+     * @param session
+     * @param destination
+     * @param selector
+     * @param noLocal
+     * @throws JMSException
+     */
+    protected JmsMessageConsumer(JmsConsumerId consumerId, JmsSession session, JmsDestination destination,
+        String selector, boolean noLocal) throws JMSException {
+        this(consumerId, session, destination, null, selector, noLocal);
+    }
+
+    /**
+     * Create a MessageConsumer which could be durable.
+     *
+     * @param consumerId
+     * @param session
+     * @param destination
+     * @param name
+     * @param selector
+     * @param noLocal
+     * @throws JMSException
+     */
+    protected JmsMessageConsumer(JmsConsumerId consumerId, JmsSession session, JmsDestination destination,
+                                 String name, String selector, boolean noLocal) throws JMSException {
+        this.session = session;
+        this.connection = session.getConnection();
+        this.acknowledgementMode = session.acknowledgementMode();
+
+        if (connection.isMessagePrioritySupported()) {
+            this.messageQueue = new PriorityMessageQueue();
+        } else {
+            this.messageQueue = new FifoMessageQueue();
+        }
+
+        JmsPrefetchPolicy policy = this.connection.getPrefetchPolicy();
+
+        this.consumerInfo = new JmsConsumerInfo(consumerId);
+        this.consumerInfo.setClientId(connection.getClientID());
+        this.consumerInfo.setSelector(selector);
+        this.consumerInfo.setSubscriptionName(name);
+        this.consumerInfo.setDestination(destination);
+        this.consumerInfo.setAcknowledgementMode(acknowledgementMode);
+        this.consumerInfo.setNoLocal(noLocal);
+        this.consumerInfo.setBrowser(isBrowser());
+        this.consumerInfo.setPrefetchSize(getConfiguredPrefetch(destination, policy));
+
+        try {
+            this.consumerInfo = session.getConnection().createResource(consumerInfo);
+        } catch (JMSException ex) {
+            throw ex;
+        }
+    }
+
+    public void init() throws JMSException {
+        session.add(this);
+        try {
+            session.getConnection().startResource(consumerInfo);
+        } catch (JMSException ex) {
+            session.remove(this);
+            throw ex;
+        }
+    }
+
+    /**
+     * @throws JMSException
+     * @see javax.jms.MessageConsumer#close()
+     */
+    @Override
+    public void close() throws JMSException {
+        if (!closed.get()) {
+            if (delivered.get() && session.getTransactionContext().isInTransaction()) {
+                session.getTransactionContext().addSynchronization(new JmsTxSynchronization() {
+                    @Override
+                    public void afterCommit() throws Exception {
+                        doClose();
+                    }
+
+                    @Override
+                    public void afterRollback() throws Exception {
+                        doClose();
+                    }
+                });
+            } else {
+                doClose();
+            }
+        }
+    }
+
+    /**
+     * Called to initiate shutdown of Producer resources and request that the remote
+     * peer remove the registered producer.
+     *
+     * @throws JMSException
+     */
+    protected void doClose() throws JMSException {
+        shutdown();
+        this.connection.destroyResource(consumerInfo);
+    }
+
+    /**
+     * Called to release all producer resources without requiring a destroy request
+     * to be sent to the remote peer.  This is most commonly needed when the parent
+     * Session is closing.
+     *
+     * @throws JMSException
+     */
+    protected void shutdown() throws JMSException {
+        if (closed.compareAndSet(false, true)) {
+            this.session.remove(this);
+        }
+    }
+
+    /**
+     * @return a Message or null if closed during the operation
+     * @throws JMSException
+     * @see javax.jms.MessageConsumer#receive()
+     */
+    @Override
+    public Message receive() throws JMSException {
+        checkClosed();
+        checkMessageListener();
+        sendPullCommand(0);
+
+        try {
+            return copy(ack(this.messageQueue.dequeue(-1)));
+        } catch (Exception e) {
+            throw JmsExceptionSupport.create(e);
+        }
+    }
+
+    /**
+     * @param timeout
+     * @return a Message or null
+     * @throws JMSException
+     * @see javax.jms.MessageConsumer#receive(long)
+     */
+    @Override
+    public Message receive(long timeout) throws JMSException {
+        checkClosed();
+        checkMessageListener();
+        sendPullCommand(timeout);
+
+        if (timeout > 0) {
+            try {
+                return copy(ack(this.messageQueue.dequeue(timeout)));
+            } catch (InterruptedException e) {
+                throw JmsExceptionSupport.create(e);
+            }
+        }
+
+        return null;
+    }
+
+    /**
+     * @return a Message or null
+     * @throws JMSException
+     * @see javax.jms.MessageConsumer#receiveNoWait()
+     */
+    @Override
+    public Message receiveNoWait() throws JMSException {
+        checkClosed();
+        checkMessageListener();
+        sendPullCommand(-1);
+
+        return copy(ack(this.messageQueue.dequeueNoWait()));
+    }
+
+    protected void checkClosed() throws IllegalStateException {
+        if (this.closed.get()) {
+            throw new IllegalStateException("The MessageConsumer is closed");
+        }
+    }
+
+    JmsMessage copy(final JmsInboundMessageDispatch envelope) throws JMSException {
+        if (envelope == null || envelope.getMessage() == null) {
+            return null;
+        }
+        return envelope.getMessage().copy();
+    }
+
+    JmsInboundMessageDispatch ack(final JmsInboundMessageDispatch envelope) throws JMSException {
+        if (envelope != null && envelope.getMessage() != null) {
+            JmsMessage message = envelope.getMessage();
+            if (message.getAcknowledgeCallback() != null || session.isTransacted()) {
+                // Message has been received by the app.. expand the credit
+                // window so that we receive more messages.
+                session.acknowledge(envelope, ACK_TYPE.DELIVERED);
+            } else {
+                doAck(envelope);
+            }
+            // Tags that we have delivered and can't close if in a TX Session.
+            delivered.set(true);
+        }
+        return envelope;
+    }
+
+    private void doAck(final JmsInboundMessageDispatch envelope) throws JMSException {
+        checkClosed();
+        try {
+            session.acknowledge(envelope, ACK_TYPE.CONSUMED);
+        } catch (JMSException ex) {
+            session.onException(ex);
+            throw ex;
+        }
+    }
+
+    /**
+     * Called from the session when a new Message has been dispatched to this Consumer
+     * from the connection.
+     *
+     * @param facade
+     *        the newly arrived message.
+     */
+    @Override
+    public void onMessage(final JmsInboundMessageDispatch envelope) {
+        lock.lock();
+        try {
+            if (acknowledgementMode == Session.CLIENT_ACKNOWLEDGE) {
+                envelope.getMessage().setAcknowledgeCallback(new Callable<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        if (session.isClosed()) {
+                            throw new javax.jms.IllegalStateException("Session closed.");
+                        }
+                        session.acknowledge();
+                        return null;
+                    }
+                });
+            }
+            this.messageQueue.enqueue(envelope);
+        } finally {
+            lock.unlock();
+        }
+
+        if (this.messageListener != null && this.started) {
+            session.getExecutor().execute(new Runnable() {
+                @Override
+                public void run() {
+                    JmsInboundMessageDispatch envelope;
+                    while (session.isStarted() && (envelope = messageQueue.dequeueNoWait()) != null) {
+                        try {
+                            messageListener.onMessage(copy(ack(envelope)));
+                        } catch (Exception e) {
+                            session.getConnection().onException(e);
+                        }
+                    }
+                }
+            });
+        } else {
+            if (availableListener != null) {
+                availableListener.onMessageAvailable(this);
+            }
+        }
+    }
+
+    public void start() {
+        lock.lock();
+        try {
+            this.started = true;
+            this.messageQueue.start();
+            drainMessageQueueToListener();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void stop() {
+        lock.lock();
+        try {
+            this.started = false;
+            this.messageQueue.stop();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    void drainMessageQueueToListener() {
+        MessageListener listener = this.messageListener;
+        if (listener != null) {
+            if (!this.messageQueue.isEmpty()) {
+                List<JmsInboundMessageDispatch> drain = this.messageQueue.removeAll();
+                for (JmsInboundMessageDispatch envelope : drain) {
+                    try {
+                        listener.onMessage(copy(ack(envelope)));
+                    } catch (Exception e) {
+                        session.getConnection().onException(e);
+                    }
+                }
+                drain.clear();
+            }
+        }
+    }
+
+    /**
+     * @return the id
+     */
+    public JmsConsumerId getConsumerId() {
+        return this.consumerInfo.getConsumerId();
+    }
+
+    /**
+     * @return the Destination
+     */
+    public JmsDestination getDestination() {
+        return this.consumerInfo.getDestination();
+    }
+
+    @Override
+    public MessageListener getMessageListener() throws JMSException {
+        checkClosed();
+        return this.messageListener;
+    }
+
+    /**
+     * @param listener
+     * @throws JMSException
+     * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener)
+     */
+    @Override
+    public void setMessageListener(MessageListener listener) throws JMSException {
+        checkClosed();
+        if (consumerInfo.getPrefetchSize() == 0) {
+            throw new JMSException("Illegal prefetch size of zero. This setting is not supported" +
+                                   "for asynchronous consumers please set a value of at least 1");
+        }
+        this.messageListener = listener;
+        drainMessageQueueToListener();
+    }
+
+    /**
+     * @return the Message Selector
+     * @throws JMSException
+     * @see javax.jms.MessageConsumer#getMessageSelector()
+     */
+    @Override
+    public String getMessageSelector() throws JMSException {
+        checkClosed();
+        return this.consumerInfo.getSelector();
+    }
+
+    /**
+     * Gets the configured prefetch size for this consumer.
+     * @return the prefetch size configuration for this consumer.
+     */
+    public int getPrefetchSize() {
+        return this.consumerInfo.getPrefetchSize();
+    }
+
+    protected void checkMessageListener() throws JMSException {
+        session.checkMessageListener();
+    }
+
+    boolean hasMessageListener() {
+        return this.messageListener != null;
+    }
+
+    boolean isUsingDestination(JmsDestination destination) {
+        return this.consumerInfo.getDestination().equals(destination);
+    }
+
+    protected int getMessageQueueSize() {
+        return this.messageQueue.size();
+    }
+
+    public boolean getNoLocal() throws IllegalStateException {
+        return this.consumerInfo.isNoLocal();
+    }
+
+    public boolean isDurableSubscription() {
+        return false;
+    }
+
+    public boolean isBrowser() {
+        return false;
+    }
+
+    @Override
+    public void setAvailableListener(JmsMessageAvailableListener availableListener) {
+        this.availableListener = availableListener;
+    }
+
+    @Override
+    public JmsMessageAvailableListener getAvailableListener() {
+        return availableListener;
+    }
+
+    protected void onConnectionInterrupted() {
+        messageQueue.clear();
+    }
+
+    protected void onConnectionRecovery(Provider provider) throws Exception {
+        ProviderFuture request = new ProviderFuture();
+        provider.create(consumerInfo, request);
+        request.sync();
+    }
+
+    protected void onConnectionRecovered(Provider provider) throws Exception {
+        ProviderFuture request = new ProviderFuture();
+        provider.start(consumerInfo, request);
+        request.sync();
+    }
+
+    protected void onConnectionRestored() {
+    }
+
+    /**
+     * Triggers a pull request from the connected Provider.  An attempt is made to set
+     * a timeout on the pull request however some providers will not honor this value
+     * and the pull will remain active until a message is dispatched.
+     *
+     * The timeout value can be one of:
+     *
+     *  < 0 to indicate that the request should expire immediately if no message.
+     *  = 0 to indicate that the request should never time out.
+     *  > 1 to indicate that the request should expire after the given time in milliseconds.
+     *
+     * @param timeout
+     *        The amount of time the pull request should remain valid.
+     */
+    protected void sendPullCommand(long timeout) throws JMSException {
+        if (messageQueue.isEmpty() && (getPrefetchSize() == 0 || isBrowser())) {
+            connection.pull(getConsumerId(), timeout);
+        }
+    }
+
+    private int getConfiguredPrefetch(JmsDestination destination, JmsPrefetchPolicy policy) {
+        int prefetch = 0;
+        if (destination.isTopic()) {
+            if (isDurableSubscription()) {
+                prefetch = policy.getDurableTopicPrefetch();
+            } else {
+                prefetch = policy.getTopicPrefetch();
+            }
+        } else {
+            if (isBrowser()) {
+                prefetch = policy.getQueueBrowserPrefetch();
+            } else {
+                prefetch = policy.getQueuePrefetch();
+            }
+        }
+
+        return prefetch;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageDispatcher.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageDispatcher.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageDispatcher.java
new file mode 100644
index 0000000..602e8b0
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageDispatcher.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+
+public interface JmsMessageDispatcher {
+
+    /**
+     * Called when a new Message delivery is in progress.
+     *
+     * @param envelope
+     *        the incoming message dispatch information.
+     */
+    void onMessage(JmsInboundMessageDispatch envelope);
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
new file mode 100644
index 0000000..4d09c04
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+
+import org.apache.qpid.jms.message.JmsMessageTransformation;
+import org.apache.qpid.jms.meta.JmsProducerId;
+import org.apache.qpid.jms.meta.JmsProducerInfo;
+import org.apache.qpid.jms.provider.Provider;
+import org.apache.qpid.jms.provider.ProviderFuture;
+
+/**
+ * Implementation of a Jms MessageProducer
+ */
+public class JmsMessageProducer implements MessageProducer {
+
+    protected final JmsSession session;
+    protected final JmsConnection connection;
+    protected JmsProducerInfo producerInfo;
+    protected final boolean flexibleDestination;
+    protected int deliveryMode = DeliveryMode.PERSISTENT;
+    protected int priority = Message.DEFAULT_PRIORITY;
+    protected long timeToLive = Message.DEFAULT_TIME_TO_LIVE;
+    protected final AtomicBoolean closed = new AtomicBoolean();
+    protected boolean disableMessageId;
+    protected boolean disableTimestamp;
+    protected final AtomicLong messageSequence = new AtomicLong();
+
+    protected JmsMessageProducer(JmsProducerId producerId, JmsSession session, JmsDestination destination) throws JMSException {
+        this.session = session;
+        this.connection = session.getConnection();
+        this.flexibleDestination = destination == null;
+        this.producerInfo = new JmsProducerInfo(producerId);
+        this.producerInfo.setDestination(destination);
+        this.producerInfo = session.getConnection().createResource(producerInfo);
+    }
+
+    /**
+     * Close the producer
+     *
+     * @throws JMSException
+     *
+     * @see javax.jms.MessageProducer#close()
+     */
+    @Override
+    public void close() throws JMSException {
+        if (!closed.get()) {
+            doClose();
+        }
+    }
+
+    /**
+     * Called to initiate shutdown of Producer resources and request that the remote
+     * peer remove the registered producer.
+     *
+     * @throws JMSException
+     */
+    protected void doClose() throws JMSException {
+        shutdown();
+        this.connection.destroyResource(producerInfo);
+    }
+
+    /**
+     * Called to release all producer resources without requiring a destroy request
+     * to be sent to the remote peer.  This is most commonly needed when the parent
+     * Session is closing.
+     *
+     * @throws JMSException
+     */
+    protected void shutdown() throws JMSException {
+        if (closed.compareAndSet(false, true)) {
+            this.session.remove(this);
+        }
+    }
+
+    /**
+     * @return the delivery mode
+     * @throws JMSException
+     * @see javax.jms.MessageProducer#getDeliveryMode()
+     */
+    @Override
+    public int getDeliveryMode() throws JMSException {
+        checkClosed();
+        return this.deliveryMode;
+    }
+
+    /**
+     * @return the destination
+     * @throws JMSException
+     * @see javax.jms.MessageProducer#getDestination()
+     */
+    @Override
+    public Destination getDestination() throws JMSException {
+        checkClosed();
+        return this.producerInfo.getDestination();
+    }
+
+    /**
+     * @return true if disableIds is set
+     * @throws JMSException
+     * @see javax.jms.MessageProducer#getDisableMessageID()
+     */
+    @Override
+    public boolean getDisableMessageID() throws JMSException {
+        checkClosed();
+        return this.disableMessageId;
+    }
+
+    /**
+     * @return true if disable timestamp is set
+     * @throws JMSException
+     * @see javax.jms.MessageProducer#getDisableMessageTimestamp()
+     */
+    @Override
+    public boolean getDisableMessageTimestamp() throws JMSException {
+        checkClosed();
+        return this.disableTimestamp;
+    }
+
+    /**
+     * @return the priority
+     * @throws JMSException
+     * @see javax.jms.MessageProducer#getPriority()
+     */
+    @Override
+    public int getPriority() throws JMSException {
+        checkClosed();
+        return this.priority;
+    }
+
+    /**
+     * @return timeToLive
+     * @throws JMSException
+     * @see javax.jms.MessageProducer#getTimeToLive()
+     */
+    @Override
+    public long getTimeToLive() throws JMSException {
+        checkClosed();
+        return this.timeToLive;
+    }
+
+    /**
+     * @param message
+     * @throws JMSException
+     * @see javax.jms.MessageProducer#send(javax.jms.Message)
+     */
+    @Override
+    public void send(Message message) throws JMSException {
+        send(producerInfo.getDestination(), message, this.deliveryMode, this.priority, this.timeToLive);
+    }
+
+    /**
+     * @param destination
+     * @param message
+     * @throws JMSException
+     * @see javax.jms.MessageProducer#send(javax.jms.Destination,
+     *      javax.jms.Message)
+     */
+    @Override
+    public void send(Destination destination, Message message) throws JMSException {
+        send(destination, message, this.deliveryMode, this.priority, this.timeToLive);
+    }
+
+    /**
+     * @param message
+     * @param deliveryMode
+     * @param priority
+     * @param timeToLive
+     * @throws JMSException
+     * @see javax.jms.MessageProducer#send(javax.jms.Message, int, int, long)
+     */
+    @Override
+    public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
+        send(producerInfo.getDestination(), message, deliveryMode, priority, timeToLive);
+    }
+
+    /**
+     * @param destination
+     * @param message
+     * @param deliveryMode
+     * @param priority
+     * @param timeToLive
+     * @throws JMSException
+     * @see javax.jms.MessageProducer#send(javax.jms.Destination,
+     *      javax.jms.Message, int, int, long)
+     */
+    @Override
+    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
+        checkClosed();
+
+        if (destination == null) {
+            throw new InvalidDestinationException("Don't understand null destinations");
+        }
+        if (!this.flexibleDestination && !destination.equals(producerInfo.getDestination())) {
+            throw new UnsupportedOperationException("This producer can only send messages to: " + producerInfo.getDestination().getName());
+        }
+
+        this.session.send(this, destination, message, deliveryMode, priority, timeToLive, disableMessageId, disableTimestamp);
+    }
+
+    /**
+     * @param deliveryMode
+     * @throws JMSException
+     * @see javax.jms.MessageProducer#setDeliveryMode(int)
+     */
+    @Override
+    public void setDeliveryMode(int deliveryMode) throws JMSException {
+        checkClosed();
+        this.deliveryMode = deliveryMode;
+    }
+
+    /**
+     * @param value
+     * @throws JMSException
+     * @see javax.jms.MessageProducer#setDisableMessageID(boolean)
+     */
+    @Override
+    public void setDisableMessageID(boolean value) throws JMSException {
+        checkClosed();
+        this.disableMessageId = value;
+    }
+
+    /**
+     * @param value
+     * @throws JMSException
+     * @see javax.jms.MessageProducer#setDisableMessageTimestamp(boolean)
+     */
+    @Override
+    public void setDisableMessageTimestamp(boolean value) throws JMSException {
+        checkClosed();
+        this.disableTimestamp = value;
+    }
+
+    /**
+     * @param defaultPriority
+     * @throws JMSException
+     * @see javax.jms.MessageProducer#setPriority(int)
+     */
+    @Override
+    public void setPriority(int defaultPriority) throws JMSException {
+        checkClosed();
+        this.priority = defaultPriority;
+    }
+
+    /**
+     * @param timeToLive
+     * @throws JMSException
+     * @see javax.jms.MessageProducer#setTimeToLive(long)
+     */
+    @Override
+    public void setTimeToLive(long timeToLive) throws JMSException {
+        checkClosed();
+        this.timeToLive = timeToLive;
+    }
+
+    /**
+     * @param destination
+     *        the destination to set
+     * @throws JMSException
+     * @throws InvalidDestinationException
+     */
+    public void setDestination(Destination destination) throws JMSException {
+        if (destination == null) {
+            throw new InvalidDestinationException("Don't understand null destinations");
+        }
+        if (!this.flexibleDestination && !destination.equals(producerInfo.getDestination())) {
+            throw new UnsupportedOperationException("This producer can only send messages to: " + producerInfo.getDestination().getName());
+        }
+        producerInfo.setDestination(JmsMessageTransformation.transformDestination(session.getConnection(), destination));
+    }
+
+    /**
+     * @return the producer's assigned JmsProducerId.
+     */
+    protected JmsProducerId getProducerId() {
+        return this.producerInfo.getProducerId();
+    }
+
+    /**
+     * @return the next logical sequence for a Message sent from this Producer.
+     */
+    protected long getNextMessageSequence() {
+        return this.messageSequence.incrementAndGet();
+    }
+
+    protected void checkClosed() throws IllegalStateException {
+        if (closed.get()) {
+            throw new IllegalStateException("The MessageProducer is closed");
+        }
+    }
+
+    ////////////////////////////////////////////////////////////////////////////
+    // Connection interruption handlers.
+    ////////////////////////////////////////////////////////////////////////////
+
+    protected void onConnectionInterrupted() {
+    }
+
+    protected void onConnectionRecovery(Provider provider) throws Exception {
+        ProviderFuture request = new ProviderFuture();
+        provider.create(producerInfo, request);
+        request.sync();
+    }
+
+    protected void onConnectionRecovered(Provider provider) throws Exception {
+    }
+
+    protected void onConnectionRestored() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPrefetchPolicy.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPrefetchPolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPrefetchPolicy.java
new file mode 100644
index 0000000..c1212f2
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPrefetchPolicy.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import java.io.Serializable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Defines the prefetch message policies for different types of consumers
+ */
+public class JmsPrefetchPolicy extends Object implements Serializable {
+
+    private static final long serialVersionUID = 5298685386681646744L;
+
+    public static final int MAX_PREFETCH_SIZE = Short.MAX_VALUE;
+    public static final int DEFAULT_QUEUE_PREFETCH = 1000;
+    public static final int DEFAULT_QUEUE_BROWSER_PREFETCH = 500;
+    public static final int DEFAULT_DURABLE_TOPIC_PREFETCH = 100;
+    public static final int DEFAULT_TOPIC_PREFETCH = MAX_PREFETCH_SIZE;
+
+    private static final Logger LOG = LoggerFactory.getLogger(JmsPrefetchPolicy.class);
+
+    private int queuePrefetch;
+    private int queueBrowserPrefetch;
+    private int topicPrefetch;
+    private int durableTopicPrefetch;
+    private int maxPrefetchSize = MAX_PREFETCH_SIZE;
+
+    /**
+     * Initialize default prefetch policies
+     */
+    public JmsPrefetchPolicy() {
+        this.queuePrefetch = DEFAULT_QUEUE_PREFETCH;
+        this.queueBrowserPrefetch = DEFAULT_QUEUE_BROWSER_PREFETCH;
+        this.topicPrefetch = DEFAULT_TOPIC_PREFETCH;
+        this.durableTopicPrefetch = DEFAULT_DURABLE_TOPIC_PREFETCH;
+    }
+
+    /**
+     * Creates a new JmsPrefetchPolicy instance copied from the source policy.
+     *
+     * @param source
+     *      The policy instance to copy values from.
+     */
+    public JmsPrefetchPolicy(JmsPrefetchPolicy source) {
+        this.queuePrefetch = source.getQueuePrefetch();
+        this.queueBrowserPrefetch = source.getQueueBrowserPrefetch();
+        this.topicPrefetch = source.getTopicPrefetch();
+        this.durableTopicPrefetch = source.getDurableTopicPrefetch();
+    }
+
+    /**
+     * @return Returns the durableTopicPrefetch.
+     */
+    public int getDurableTopicPrefetch() {
+        return durableTopicPrefetch;
+    }
+
+    /**
+     * Sets the durable topic prefetch value, this value is limited by the max
+     * prefetch size setting.
+     *
+     * @param durableTopicPrefetch
+     *        The durableTopicPrefetch to set.
+     */
+    public void setDurableTopicPrefetch(int durableTopicPrefetch) {
+        this.durableTopicPrefetch = getMaxPrefetchLimit(durableTopicPrefetch);
+    }
+
+    /**
+     * @return Returns the queuePrefetch.
+     */
+    public int getQueuePrefetch() {
+        return queuePrefetch;
+    }
+
+    /**
+     * @param queuePrefetch
+     *        The queuePrefetch to set.
+     */
+    public void setQueuePrefetch(int queuePrefetch) {
+        this.queuePrefetch = getMaxPrefetchLimit(queuePrefetch);
+    }
+
+    /**
+     * @return Returns the queueBrowserPrefetch.
+     */
+    public int getQueueBrowserPrefetch() {
+        return queueBrowserPrefetch;
+    }
+
+    /**
+     * @param queueBrowserPrefetch
+     *        The queueBrowserPrefetch to set.
+     */
+    public void setQueueBrowserPrefetch(int queueBrowserPrefetch) {
+        this.queueBrowserPrefetch = getMaxPrefetchLimit(queueBrowserPrefetch);
+    }
+
+    /**
+     * @return Returns the topicPrefetch.
+     */
+    public int getTopicPrefetch() {
+        return topicPrefetch;
+    }
+
+    /**
+     * @param topicPrefetch
+     *        The topicPrefetch to set.
+     */
+    public void setTopicPrefetch(int topicPrefetch) {
+        this.topicPrefetch = getMaxPrefetchLimit(topicPrefetch);
+    }
+
+    /**
+     * Gets the currently configured max prefetch size value.
+     * @return the currently configured max prefetch value.
+     */
+    public int getMaxPrefetchSize() {
+        return maxPrefetchSize;
+    }
+
+    /**
+     * Sets the maximum prefetch size value.
+     *
+     * @param maxPrefetchSize
+     *        The maximum allowed value for any of the prefetch size options.
+     */
+    public void setMaxPrefetchSize(int maxPrefetchSize) {
+        this.maxPrefetchSize = maxPrefetchSize;
+    }
+
+    /**
+     * Sets the prefetch values for all options in this policy to the set limit.  If the value
+     * given is larger than the max prefetch value of this policy the new limit will be capped
+     * at the max prefetch value.
+     *
+     * @param prefetch
+     *      The prefetch value to apply to all prefetch limits.
+     */
+    public void setAll(int prefetch) {
+        this.durableTopicPrefetch = getMaxPrefetchLimit(prefetch);
+        this.queueBrowserPrefetch = getMaxPrefetchLimit(prefetch);
+        this.queuePrefetch = getMaxPrefetchLimit(prefetch);
+        this.topicPrefetch = getMaxPrefetchLimit(prefetch);
+    }
+
+    @Override
+    public boolean equals(Object object) {
+        if (object instanceof JmsPrefetchPolicy) {
+            JmsPrefetchPolicy other = (JmsPrefetchPolicy) object;
+            return this.queuePrefetch == other.queuePrefetch && this.queueBrowserPrefetch == other.queueBrowserPrefetch
+                && this.topicPrefetch == other.topicPrefetch && this.durableTopicPrefetch == other.durableTopicPrefetch;
+        }
+        return false;
+    }
+
+    private int getMaxPrefetchLimit(int value) {
+        int result = Math.min(value, maxPrefetchSize);
+        if (result < value) {
+            LOG.warn("maximum prefetch limit has been reset from " + value + " to " + MAX_PREFETCH_SIZE);
+        }
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueue.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueue.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueue.java
new file mode 100644
index 0000000..d9e397c
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueue.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.Queue;
+
+/**
+ * JMS Queue implementation
+ */
+public class JmsQueue extends JmsDestination implements Queue {
+
+    public JmsQueue() {
+        super(null, false, false);
+    }
+
+    public JmsQueue(String name) {
+        super(name, false, false);
+    }
+
+    @Override
+    public JmsQueue copy() {
+        final JmsQueue copy = new JmsQueue();
+        copy.setProperties(getProperties());
+        return copy;
+    }
+
+    /**
+     * @return name
+     * @see javax.jms.Queue#getQueueName()
+     */
+    @Override
+    public String getQueueName() {
+        return getName();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueBrowser.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueBrowser.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueBrowser.java
new file mode 100644
index 0000000..ce20d42
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueBrowser.java
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import java.util.Enumeration;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A client uses a <CODE>QueueBrowser</CODE> object to look at messages on a queue without
+ * removing them.
+ * <p/>
+ * <p/>
+ * The <CODE>getEnumeration</CODE> method returns a <CODE>
+ * java.util.Enumeration</CODE> that is used to scan the queue's messages. It may be an
+ * enumeration of the entire content of a queue, or it may contain only the messages matching a
+ * message selector.
+ * <p/>
+ * <p/>
+ * Messages may be arriving and expiring while the scan is done. The JMS API does not require
+ * the content of an enumeration to be a static snapshot of queue content. Whether these changes
+ * are visible or not depends on the JMS provider.
+ * <p/>
+ * <p/>
+ * A <CODE>QueueBrowser</CODE> can be created from either a <CODE>Session
+ * </CODE> or a <CODE>QueueSession</CODE>.
+ *
+ * @see javax.jms.Session#createBrowser
+ * @see javax.jms.QueueSession#createBrowser
+ * @see javax.jms.QueueBrowser
+ * @see javax.jms.QueueReceiver
+ */
+public class JmsQueueBrowser implements QueueBrowser, Enumeration<Message> {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(JmsQueueBrowser.class);
+
+    private final JmsSession session;
+    private final JmsDestination destination;
+    private final String selector;
+
+    private JmsMessageConsumer consumer;
+    private final AtomicBoolean browseDone = new AtomicBoolean(false);
+
+    private Message next;
+    private final AtomicBoolean closed = new AtomicBoolean();
+    private final Object semaphore = new Object();
+
+    /**
+     * Constructor for an JmsQueueBrowser - used internally
+     *
+     * @param session
+     * @param id
+     * @param destination
+     * @param selector
+     * @throws javax.jms.JMSException
+     */
+    protected JmsQueueBrowser(JmsSession session, JmsDestination destination, String selector) throws JMSException {
+        this.session = session;
+        this.destination = destination;
+        this.selector = selector;
+    }
+
+    private void destroyConsumer() {
+        if (consumer == null) {
+            return;
+        }
+        try {
+            if (session.getTransacted()) {
+                session.commit();
+            }
+            consumer.close();
+            consumer = null;
+        } catch (JMSException e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * Gets an enumeration for browsing the current queue messages in the order they would be
+     * received.
+     *
+     * @return an enumeration for browsing the messages
+     * @throws javax.jms.JMSException
+     *         if the JMS provider fails to get the enumeration for this browser due to some
+     *         internal error.
+     */
+    @Override
+    public Enumeration<Message> getEnumeration() throws JMSException {
+        checkClosed();
+        if (consumer == null) {
+            consumer = createConsumer();
+        }
+        return this;
+    }
+
+    private void checkClosed() throws IllegalStateException {
+        if (closed.get()) {
+            throw new IllegalStateException("The Consumer is closed");
+        }
+    }
+
+    /**
+     * @return true if more messages to process
+     */
+    @Override
+    public boolean hasMoreElements() {
+        while (true) {
+            synchronized (this) {
+                if (consumer == null) {
+                    return false;
+                }
+            }
+
+            if (next == null) {
+                try {
+                    next = consumer.receiveNoWait();
+                } catch (JMSException e) {
+                    LOG.warn("Error while receive the next message: {}", e.getMessage());
+                    // TODO - Add client internal error listener.
+                    // this.session.connection.onClientInternalException(e);
+                }
+
+                if (next != null) {
+                    return true;
+                }
+            } else {
+                return true;
+            }
+
+            if (browseDone.get() || !session.isStarted()) {
+                destroyConsumer();
+                return false;
+            }
+
+            waitForMessage();
+        }
+    }
+
+    /**
+     * @return the next message if one exists
+     *
+     * @throws NoSuchElementException if no more elements are available.
+     */
+    @Override
+    public Message nextElement() {
+        synchronized (this) {
+            if (consumer == null) {
+                return null;
+            }
+        }
+
+        if (hasMoreElements()) {
+            Message message = next;
+            next = null;
+            return message;
+        }
+
+        if (browseDone.get() || !session.isStarted()) {
+            destroyConsumer();
+            return null;
+        }
+
+        throw new NoSuchElementException();
+    }
+
+    @Override
+    public void close() throws JMSException {
+        if (closed.compareAndSet(false, true)) {
+            browseDone.set(true);
+            destroyConsumer();
+        }
+    }
+
+    /**
+     * Gets the queue associated with this queue browser.
+     *
+     * @return the queue
+     * @throws javax.jms.JMSException
+     *         if the JMS provider fails to get the queue associated with this browser due to
+     *         some internal error.
+     */
+
+    @Override
+    public Queue getQueue() throws JMSException {
+        return (Queue) destination;
+    }
+
+    @Override
+    public String getMessageSelector() throws JMSException {
+        return selector;
+    }
+
+    /**
+     * Wait on a semaphore for a fixed amount of time for a message to come in.
+     */
+    protected void waitForMessage() {
+        try {
+            synchronized (semaphore) {
+                semaphore.wait(2000);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    protected void notifyMessageAvailable() {
+        synchronized (semaphore) {
+            semaphore.notifyAll();
+        }
+    }
+
+    @Override
+    public String toString() {
+        JmsMessageConsumer consumer = this.consumer;
+        return "JmsQueueBrowser { value=" + (consumer != null ? consumer.getConsumerId() : "null") + " }";
+    }
+
+    private JmsMessageConsumer createConsumer() throws JMSException {
+        browseDone.set(false);
+        JmsMessageConsumer rc = new JmsMessageConsumer(session.getNextConsumerId(), session, destination, selector, false) {
+
+            @Override
+            public boolean isBrowser() {
+                return true;
+            }
+
+            @Override
+            public void onMessage(JmsInboundMessageDispatch envelope) {
+                if (envelope.getMessage() == null) {
+                    browseDone.set(true);
+                } else {
+                    super.onMessage(envelope);
+                }
+                notifyMessageAvailable();
+            }
+        };
+        rc.init();
+        return rc;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueConnection.java
new file mode 100644
index 0000000..39eadeb
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueConnection.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.ConnectionConsumer;
+import javax.jms.JMSException;
+import javax.jms.ServerSessionPool;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+
+import org.apache.qpid.jms.provider.Provider;
+import org.apache.qpid.jms.util.IdGenerator;
+
+public class JmsQueueConnection extends JmsConnection {
+
+    public JmsQueueConnection(String connectionId, Provider provider, IdGenerator clientIdGenerator) throws JMSException {
+        super(connectionId, provider, clientIdGenerator);
+    }
+
+    @Override
+    public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
+        throw new javax.jms.IllegalStateException("Operation not supported by a QueueConnection");
+    }
+
+    @Override
+    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+        throw new javax.jms.IllegalStateException("Operation not supported by a QueueConnection");
+    }
+
+    @Override
+    public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+        throw new javax.jms.IllegalStateException("Operation not supported by a QueueConnection");
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueReceiver.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueReceiver.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueReceiver.java
new file mode 100644
index 0000000..aa1da65
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueReceiver.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueReceiver;
+
+import org.apache.qpid.jms.meta.JmsConsumerId;
+
+/**
+ * Implementation of a JMS QueueReceiver
+ */
+public class JmsQueueReceiver extends JmsMessageConsumer implements QueueReceiver {
+
+    /**
+     * Constructor
+     *
+     * @param id
+     *      This receiver's assigned Id.
+     * @param session
+     *      The session that created this receiver.
+     * @param dest
+     *      The destination that this receiver listens on.
+     * @param selector
+     *      The selector used to filter messages for this receiver.
+     *
+     * @throws JMSException
+     */
+    protected JmsQueueReceiver(JmsConsumerId id, JmsSession session, JmsDestination dest, String selector) throws JMSException {
+        super(id, session, dest, selector, false);
+    }
+
+    /**
+     * @return the Queue
+     * @throws IllegalStateException
+     * @see javax.jms.QueueReceiver#getQueue()
+     */
+    @Override
+    public Queue getQueue() throws IllegalStateException {
+        checkClosed();
+        return (Queue) this.getDestination();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueSender.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueSender.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueSender.java
new file mode 100644
index 0000000..c2276c8
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueSender.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.QueueSender;
+
+import org.apache.qpid.jms.meta.JmsProducerId;
+
+/**
+ * Implementation of a Queue Sender
+ */
+public class JmsQueueSender extends JmsMessageProducer implements QueueSender {
+
+    /**
+     * Constructor
+     *
+     * @param id
+     * @param session
+     * @param destination
+     */
+    protected JmsQueueSender(JmsProducerId id, JmsSession session, JmsDestination destination) throws JMSException {
+        super(id, session, destination);
+    }
+
+    /**
+     * @return the Queue
+     * @throws IllegalStateException
+     * @see javax.jms.QueueSender#getQueue()
+     */
+    @Override
+    public Queue getQueue() throws IllegalStateException {
+        checkClosed();
+        return (Queue) this.producerInfo.getDestination();
+    }
+
+    /**
+     * @param queue
+     * @param message
+     * @throws JMSException
+     * @see javax.jms.QueueSender#send(javax.jms.Queue, javax.jms.Message)
+     */
+    @Override
+    public void send(Queue queue, Message message) throws JMSException {
+        super.send(queue, message);
+    }
+
+    /**
+     * @param queue
+     * @param message
+     * @param deliveryMode
+     * @param priority
+     * @param timeToLive
+     * @throws JMSException
+     * @see javax.jms.QueueSender#send(javax.jms.Queue, javax.jms.Message, int, int, long)
+     */
+    @Override
+    public void send(Queue queue, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
+        super.send(message, deliveryMode, priority, timeToLive);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueSession.java
new file mode 100644
index 0000000..274c0a7
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueSession.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.TemporaryTopic;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSubscriber;
+
+import org.apache.qpid.jms.meta.JmsSessionId;
+
+/**
+ * JMS QueueSession implementation
+ */
+public class JmsQueueSession extends JmsSession {
+
+    protected JmsQueueSession(JmsConnection connection, JmsSessionId sessionId, int acknowledgementMode) throws JMSException {
+        super(connection, sessionId, acknowledgementMode);
+    }
+
+    @Override
+    public MessageConsumer createConsumer(Destination destination) throws JMSException {
+        if (destination instanceof Topic) {
+            throw new IllegalStateException("Operation not supported by a QueueSession");
+        }
+        return super.createConsumer(destination);
+    }
+
+    /**
+     * @param destination
+     * @param messageSelector
+     * @return
+     * @throws JMSException
+     * @see javax.jms.Session#createConsumer(javax.jms.Destination,
+     *      java.lang.String)
+     */
+    @Override
+    public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
+        if (destination instanceof Topic) {
+            throw new IllegalStateException("Operation not supported by a QueueSession");
+        }
+        return super.createConsumer(destination, messageSelector);
+    }
+
+    /**
+     * @param destination
+     * @param messageSelector
+     * @param NoLocal
+     * @return
+     * @throws JMSException
+     * @see javax.jms.Session#createConsumer(javax.jms.Destination,
+     *      java.lang.String, boolean)
+     */
+    @Override
+    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal) throws JMSException {
+        throw new IllegalStateException("Operation not supported by a QueueSession");
+    }
+
+    /**
+     * @param topic
+     * @param name
+     * @return
+     * @throws JMSException
+     * @see javax.jms.Session#createDurableSubscriber(javax.jms.Topic,
+     *      java.lang.String)
+     */
+    @Override
+    public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
+        throw new IllegalStateException("Operation not supported by a QueueSession");
+    }
+
+    /**
+     * @param topic
+     * @param name
+     * @param messageSelector
+     * @param noLocal
+     * @return
+     * @throws IllegalStateException
+     * @throws JMSException
+     * @see javax.jms.Session#createDurableSubscriber(javax.jms.Topic,
+     *      java.lang.String, java.lang.String, boolean)
+     */
+    @Override
+    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws IllegalStateException {
+        throw new IllegalStateException("Operation not supported by a QueueSession");
+    }
+
+    /**
+     * @param destination
+     * @return
+     * @throws JMSException
+     * @see javax.jms.Session#createProducer(javax.jms.Destination)
+     */
+    @Override
+    public MessageProducer createProducer(Destination destination) throws JMSException {
+        if (destination instanceof Topic) {
+            throw new IllegalStateException("Operation not supported by a QueueSession");
+        }
+        return super.createProducer(destination);
+    }
+
+    /**
+     * @return
+     * @throws JMSException
+     * @see javax.jms.Session#createTemporaryTopic()
+     */
+    @Override
+    public TemporaryTopic createTemporaryTopic() throws JMSException {
+        throw new IllegalStateException("Operation not supported by a QueueSession");
+    }
+
+    /**
+     * @param topicName
+     * @return
+     * @throws JMSException
+     * @see javax.jms.Session#createTopic(java.lang.String)
+     */
+    @Override
+    public Topic createTopic(String topicName) throws JMSException {
+        throw new IllegalStateException("Operation not supported by a QueueSession");
+    }
+
+    /**
+     * @param name
+     * @throws JMSException
+     * @see javax.jms.Session#unsubscribe(java.lang.String)
+     */
+    @Override
+    public void unsubscribe(String name) throws JMSException {
+        throw new IllegalStateException("Operation not supported by a QueueSession");
+    }
+
+    /**
+     * @param topic
+     * @return
+     * @throws JMSException
+     * @see javax.jms.TopicSession#createPublisher(javax.jms.Topic)
+     */
+    @Override
+    public TopicPublisher createPublisher(Topic topic) throws JMSException {
+        throw new IllegalStateException("Operation not supported by a QueueSession");
+    }
+
+    /**
+     * @param topic
+     * @return
+     * @throws JMSException
+     * @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic)
+     */
+    @Override
+    public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
+        throw new IllegalStateException("Operation not supported by a QueueSession");
+    }
+
+    /**
+     * @param topic
+     * @param messageSelector
+     * @param noLocal
+     * @return
+     * @throws JMSException
+     * @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic,
+     *      java.lang.String, boolean)
+     */
+    @Override
+    public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
+        throw new IllegalStateException("Operation not supported by a QueueSession");
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org