You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2014/09/23 20:20:49 UTC
[25/27] Initial drop of donated AMQP Client Code.
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageAvailableListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageAvailableListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageAvailableListener.java
new file mode 100644
index 0000000..880960f
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageAvailableListener.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.MessageConsumer;
+
+
+/**
+ * Internal JmsMessage available listener.
+ */
+public interface JmsMessageAvailableListener {
+
+ /**
+ * Called when a Message is available to be received by a client
+ *
+ * @param consumer
+ * the MessageConsumer instance that has message available.
+ */
+ public void onMessageAvailable(MessageConsumer consumer);
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
new file mode 100644
index 0000000..07cba2a
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -0,0 +1,509 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.apache.qpid.jms.message.JmsMessage;
+import org.apache.qpid.jms.meta.JmsConsumerId;
+import org.apache.qpid.jms.meta.JmsConsumerInfo;
+import org.apache.qpid.jms.provider.Provider;
+import org.apache.qpid.jms.provider.ProviderFuture;
+import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
+import org.apache.qpid.jms.util.FifoMessageQueue;
+import org.apache.qpid.jms.util.MessageQueue;
+import org.apache.qpid.jms.util.PriorityMessageQueue;
+
+/**
+ * implementation of a JMS Message Consumer
+ */
+public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableConsumer, JmsMessageDispatcher {
+
+ protected final JmsSession session;
+ protected final JmsConnection connection;
+ protected JmsConsumerInfo consumerInfo;
+ protected final int acknowledgementMode;
+ protected final AtomicBoolean closed = new AtomicBoolean();
+ protected boolean started;
+ protected MessageListener messageListener;
+ protected JmsMessageAvailableListener availableListener;
+ protected final MessageQueue messageQueue;
+ protected final Lock lock = new ReentrantLock();
+ protected final AtomicBoolean suspendedConnection = new AtomicBoolean();
+ protected final AtomicBoolean delivered = new AtomicBoolean();
+
+ /**
+ * Create a non-durable MessageConsumer
+ *
+ * @param consumerId
+ * @param session
+ * @param destination
+ * @param selector
+ * @param noLocal
+ * @throws JMSException
+ */
+ protected JmsMessageConsumer(JmsConsumerId consumerId, JmsSession session, JmsDestination destination,
+ String selector, boolean noLocal) throws JMSException {
+ this(consumerId, session, destination, null, selector, noLocal);
+ }
+
+ /**
+ * Create a MessageConsumer which could be durable.
+ *
+ * @param consumerId
+ * @param session
+ * @param destination
+ * @param name
+ * @param selector
+ * @param noLocal
+ * @throws JMSException
+ */
+ protected JmsMessageConsumer(JmsConsumerId consumerId, JmsSession session, JmsDestination destination,
+ String name, String selector, boolean noLocal) throws JMSException {
+ this.session = session;
+ this.connection = session.getConnection();
+ this.acknowledgementMode = session.acknowledgementMode();
+
+ if (connection.isMessagePrioritySupported()) {
+ this.messageQueue = new PriorityMessageQueue();
+ } else {
+ this.messageQueue = new FifoMessageQueue();
+ }
+
+ JmsPrefetchPolicy policy = this.connection.getPrefetchPolicy();
+
+ this.consumerInfo = new JmsConsumerInfo(consumerId);
+ this.consumerInfo.setClientId(connection.getClientID());
+ this.consumerInfo.setSelector(selector);
+ this.consumerInfo.setSubscriptionName(name);
+ this.consumerInfo.setDestination(destination);
+ this.consumerInfo.setAcknowledgementMode(acknowledgementMode);
+ this.consumerInfo.setNoLocal(noLocal);
+ this.consumerInfo.setBrowser(isBrowser());
+ this.consumerInfo.setPrefetchSize(getConfiguredPrefetch(destination, policy));
+
+ try {
+ this.consumerInfo = session.getConnection().createResource(consumerInfo);
+ } catch (JMSException ex) {
+ throw ex;
+ }
+ }
+
+ public void init() throws JMSException {
+ session.add(this);
+ try {
+ session.getConnection().startResource(consumerInfo);
+ } catch (JMSException ex) {
+ session.remove(this);
+ throw ex;
+ }
+ }
+
+ /**
+ * @throws JMSException
+ * @see javax.jms.MessageConsumer#close()
+ */
+ @Override
+ public void close() throws JMSException {
+ if (!closed.get()) {
+ if (delivered.get() && session.getTransactionContext().isInTransaction()) {
+ session.getTransactionContext().addSynchronization(new JmsTxSynchronization() {
+ @Override
+ public void afterCommit() throws Exception {
+ doClose();
+ }
+
+ @Override
+ public void afterRollback() throws Exception {
+ doClose();
+ }
+ });
+ } else {
+ doClose();
+ }
+ }
+ }
+
+ /**
+ * Called to initiate shutdown of Producer resources and request that the remote
+ * peer remove the registered producer.
+ *
+ * @throws JMSException
+ */
+ protected void doClose() throws JMSException {
+ shutdown();
+ this.connection.destroyResource(consumerInfo);
+ }
+
+ /**
+ * Called to release all producer resources without requiring a destroy request
+ * to be sent to the remote peer. This is most commonly needed when the parent
+ * Session is closing.
+ *
+ * @throws JMSException
+ */
+ protected void shutdown() throws JMSException {
+ if (closed.compareAndSet(false, true)) {
+ this.session.remove(this);
+ }
+ }
+
+ /**
+ * @return a Message or null if closed during the operation
+ * @throws JMSException
+ * @see javax.jms.MessageConsumer#receive()
+ */
+ @Override
+ public Message receive() throws JMSException {
+ checkClosed();
+ checkMessageListener();
+ sendPullCommand(0);
+
+ try {
+ return copy(ack(this.messageQueue.dequeue(-1)));
+ } catch (Exception e) {
+ throw JmsExceptionSupport.create(e);
+ }
+ }
+
+ /**
+ * @param timeout
+ * @return a Message or null
+ * @throws JMSException
+ * @see javax.jms.MessageConsumer#receive(long)
+ */
+ @Override
+ public Message receive(long timeout) throws JMSException {
+ checkClosed();
+ checkMessageListener();
+ sendPullCommand(timeout);
+
+ if (timeout > 0) {
+ try {
+ return copy(ack(this.messageQueue.dequeue(timeout)));
+ } catch (InterruptedException e) {
+ throw JmsExceptionSupport.create(e);
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * @return a Message or null
+ * @throws JMSException
+ * @see javax.jms.MessageConsumer#receiveNoWait()
+ */
+ @Override
+ public Message receiveNoWait() throws JMSException {
+ checkClosed();
+ checkMessageListener();
+ sendPullCommand(-1);
+
+ return copy(ack(this.messageQueue.dequeueNoWait()));
+ }
+
+ protected void checkClosed() throws IllegalStateException {
+ if (this.closed.get()) {
+ throw new IllegalStateException("The MessageConsumer is closed");
+ }
+ }
+
+ JmsMessage copy(final JmsInboundMessageDispatch envelope) throws JMSException {
+ if (envelope == null || envelope.getMessage() == null) {
+ return null;
+ }
+ return envelope.getMessage().copy();
+ }
+
+ JmsInboundMessageDispatch ack(final JmsInboundMessageDispatch envelope) throws JMSException {
+ if (envelope != null && envelope.getMessage() != null) {
+ JmsMessage message = envelope.getMessage();
+ if (message.getAcknowledgeCallback() != null || session.isTransacted()) {
+ // Message has been received by the app.. expand the credit
+ // window so that we receive more messages.
+ session.acknowledge(envelope, ACK_TYPE.DELIVERED);
+ } else {
+ doAck(envelope);
+ }
+ // Tags that we have delivered and can't close if in a TX Session.
+ delivered.set(true);
+ }
+ return envelope;
+ }
+
+ private void doAck(final JmsInboundMessageDispatch envelope) throws JMSException {
+ checkClosed();
+ try {
+ session.acknowledge(envelope, ACK_TYPE.CONSUMED);
+ } catch (JMSException ex) {
+ session.onException(ex);
+ throw ex;
+ }
+ }
+
+ /**
+ * Called from the session when a new Message has been dispatched to this Consumer
+ * from the connection.
+ *
+ * @param facade
+ * the newly arrived message.
+ */
+ @Override
+ public void onMessage(final JmsInboundMessageDispatch envelope) {
+ lock.lock();
+ try {
+ if (acknowledgementMode == Session.CLIENT_ACKNOWLEDGE) {
+ envelope.getMessage().setAcknowledgeCallback(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ if (session.isClosed()) {
+ throw new javax.jms.IllegalStateException("Session closed.");
+ }
+ session.acknowledge();
+ return null;
+ }
+ });
+ }
+ this.messageQueue.enqueue(envelope);
+ } finally {
+ lock.unlock();
+ }
+
+ if (this.messageListener != null && this.started) {
+ session.getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ JmsInboundMessageDispatch envelope;
+ while (session.isStarted() && (envelope = messageQueue.dequeueNoWait()) != null) {
+ try {
+ messageListener.onMessage(copy(ack(envelope)));
+ } catch (Exception e) {
+ session.getConnection().onException(e);
+ }
+ }
+ }
+ });
+ } else {
+ if (availableListener != null) {
+ availableListener.onMessageAvailable(this);
+ }
+ }
+ }
+
+ public void start() {
+ lock.lock();
+ try {
+ this.started = true;
+ this.messageQueue.start();
+ drainMessageQueueToListener();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void stop() {
+ lock.lock();
+ try {
+ this.started = false;
+ this.messageQueue.stop();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ void drainMessageQueueToListener() {
+ MessageListener listener = this.messageListener;
+ if (listener != null) {
+ if (!this.messageQueue.isEmpty()) {
+ List<JmsInboundMessageDispatch> drain = this.messageQueue.removeAll();
+ for (JmsInboundMessageDispatch envelope : drain) {
+ try {
+ listener.onMessage(copy(ack(envelope)));
+ } catch (Exception e) {
+ session.getConnection().onException(e);
+ }
+ }
+ drain.clear();
+ }
+ }
+ }
+
+ /**
+ * @return the id
+ */
+ public JmsConsumerId getConsumerId() {
+ return this.consumerInfo.getConsumerId();
+ }
+
+ /**
+ * @return the Destination
+ */
+ public JmsDestination getDestination() {
+ return this.consumerInfo.getDestination();
+ }
+
+ @Override
+ public MessageListener getMessageListener() throws JMSException {
+ checkClosed();
+ return this.messageListener;
+ }
+
+ /**
+ * @param listener
+ * @throws JMSException
+ * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener)
+ */
+ @Override
+ public void setMessageListener(MessageListener listener) throws JMSException {
+ checkClosed();
+ if (consumerInfo.getPrefetchSize() == 0) {
+ throw new JMSException("Illegal prefetch size of zero. This setting is not supported" +
+ "for asynchronous consumers please set a value of at least 1");
+ }
+ this.messageListener = listener;
+ drainMessageQueueToListener();
+ }
+
+ /**
+ * @return the Message Selector
+ * @throws JMSException
+ * @see javax.jms.MessageConsumer#getMessageSelector()
+ */
+ @Override
+ public String getMessageSelector() throws JMSException {
+ checkClosed();
+ return this.consumerInfo.getSelector();
+ }
+
+ /**
+ * Gets the configured prefetch size for this consumer.
+ * @return the prefetch size configuration for this consumer.
+ */
+ public int getPrefetchSize() {
+ return this.consumerInfo.getPrefetchSize();
+ }
+
+ protected void checkMessageListener() throws JMSException {
+ session.checkMessageListener();
+ }
+
+ boolean hasMessageListener() {
+ return this.messageListener != null;
+ }
+
+ boolean isUsingDestination(JmsDestination destination) {
+ return this.consumerInfo.getDestination().equals(destination);
+ }
+
+ protected int getMessageQueueSize() {
+ return this.messageQueue.size();
+ }
+
+ public boolean getNoLocal() throws IllegalStateException {
+ return this.consumerInfo.isNoLocal();
+ }
+
+ public boolean isDurableSubscription() {
+ return false;
+ }
+
+ public boolean isBrowser() {
+ return false;
+ }
+
+ @Override
+ public void setAvailableListener(JmsMessageAvailableListener availableListener) {
+ this.availableListener = availableListener;
+ }
+
+ @Override
+ public JmsMessageAvailableListener getAvailableListener() {
+ return availableListener;
+ }
+
+ protected void onConnectionInterrupted() {
+ messageQueue.clear();
+ }
+
+ protected void onConnectionRecovery(Provider provider) throws Exception {
+ ProviderFuture request = new ProviderFuture();
+ provider.create(consumerInfo, request);
+ request.sync();
+ }
+
+ protected void onConnectionRecovered(Provider provider) throws Exception {
+ ProviderFuture request = new ProviderFuture();
+ provider.start(consumerInfo, request);
+ request.sync();
+ }
+
+ protected void onConnectionRestored() {
+ }
+
+ /**
+ * Triggers a pull request from the connected Provider. An attempt is made to set
+ * a timeout on the pull request however some providers will not honor this value
+ * and the pull will remain active until a message is dispatched.
+ *
+ * The timeout value can be one of:
+ *
+ * < 0 to indicate that the request should expire immediately if no message.
+ * = 0 to indicate that the request should never time out.
+ * > 1 to indicate that the request should expire after the given time in milliseconds.
+ *
+ * @param timeout
+ * The amount of time the pull request should remain valid.
+ */
+ protected void sendPullCommand(long timeout) throws JMSException {
+ if (messageQueue.isEmpty() && (getPrefetchSize() == 0 || isBrowser())) {
+ connection.pull(getConsumerId(), timeout);
+ }
+ }
+
+ private int getConfiguredPrefetch(JmsDestination destination, JmsPrefetchPolicy policy) {
+ int prefetch = 0;
+ if (destination.isTopic()) {
+ if (isDurableSubscription()) {
+ prefetch = policy.getDurableTopicPrefetch();
+ } else {
+ prefetch = policy.getTopicPrefetch();
+ }
+ } else {
+ if (isBrowser()) {
+ prefetch = policy.getQueueBrowserPrefetch();
+ } else {
+ prefetch = policy.getQueuePrefetch();
+ }
+ }
+
+ return prefetch;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageDispatcher.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageDispatcher.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageDispatcher.java
new file mode 100644
index 0000000..602e8b0
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageDispatcher.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+
+public interface JmsMessageDispatcher {
+
+ /**
+ * Called when a new Message delivery is in progress.
+ *
+ * @param envelope
+ * the incoming message dispatch information.
+ */
+ void onMessage(JmsInboundMessageDispatch envelope);
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
new file mode 100644
index 0000000..4d09c04
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+
+import org.apache.qpid.jms.message.JmsMessageTransformation;
+import org.apache.qpid.jms.meta.JmsProducerId;
+import org.apache.qpid.jms.meta.JmsProducerInfo;
+import org.apache.qpid.jms.provider.Provider;
+import org.apache.qpid.jms.provider.ProviderFuture;
+
+/**
+ * Implementation of a Jms MessageProducer
+ */
+public class JmsMessageProducer implements MessageProducer {
+
+ protected final JmsSession session;
+ protected final JmsConnection connection;
+ protected JmsProducerInfo producerInfo;
+ protected final boolean flexibleDestination;
+ protected int deliveryMode = DeliveryMode.PERSISTENT;
+ protected int priority = Message.DEFAULT_PRIORITY;
+ protected long timeToLive = Message.DEFAULT_TIME_TO_LIVE;
+ protected final AtomicBoolean closed = new AtomicBoolean();
+ protected boolean disableMessageId;
+ protected boolean disableTimestamp;
+ protected final AtomicLong messageSequence = new AtomicLong();
+
+ protected JmsMessageProducer(JmsProducerId producerId, JmsSession session, JmsDestination destination) throws JMSException {
+ this.session = session;
+ this.connection = session.getConnection();
+ this.flexibleDestination = destination == null;
+ this.producerInfo = new JmsProducerInfo(producerId);
+ this.producerInfo.setDestination(destination);
+ this.producerInfo = session.getConnection().createResource(producerInfo);
+ }
+
+ /**
+ * Close the producer
+ *
+ * @throws JMSException
+ *
+ * @see javax.jms.MessageProducer#close()
+ */
+ @Override
+ public void close() throws JMSException {
+ if (!closed.get()) {
+ doClose();
+ }
+ }
+
+ /**
+ * Called to initiate shutdown of Producer resources and request that the remote
+ * peer remove the registered producer.
+ *
+ * @throws JMSException
+ */
+ protected void doClose() throws JMSException {
+ shutdown();
+ this.connection.destroyResource(producerInfo);
+ }
+
+ /**
+ * Called to release all producer resources without requiring a destroy request
+ * to be sent to the remote peer. This is most commonly needed when the parent
+ * Session is closing.
+ *
+ * @throws JMSException
+ */
+ protected void shutdown() throws JMSException {
+ if (closed.compareAndSet(false, true)) {
+ this.session.remove(this);
+ }
+ }
+
+ /**
+ * @return the delivery mode
+ * @throws JMSException
+ * @see javax.jms.MessageProducer#getDeliveryMode()
+ */
+ @Override
+ public int getDeliveryMode() throws JMSException {
+ checkClosed();
+ return this.deliveryMode;
+ }
+
+ /**
+ * @return the destination
+ * @throws JMSException
+ * @see javax.jms.MessageProducer#getDestination()
+ */
+ @Override
+ public Destination getDestination() throws JMSException {
+ checkClosed();
+ return this.producerInfo.getDestination();
+ }
+
+ /**
+ * @return true if disableIds is set
+ * @throws JMSException
+ * @see javax.jms.MessageProducer#getDisableMessageID()
+ */
+ @Override
+ public boolean getDisableMessageID() throws JMSException {
+ checkClosed();
+ return this.disableMessageId;
+ }
+
+ /**
+ * @return true if disable timestamp is set
+ * @throws JMSException
+ * @see javax.jms.MessageProducer#getDisableMessageTimestamp()
+ */
+ @Override
+ public boolean getDisableMessageTimestamp() throws JMSException {
+ checkClosed();
+ return this.disableTimestamp;
+ }
+
+ /**
+ * @return the priority
+ * @throws JMSException
+ * @see javax.jms.MessageProducer#getPriority()
+ */
+ @Override
+ public int getPriority() throws JMSException {
+ checkClosed();
+ return this.priority;
+ }
+
+ /**
+ * @return timeToLive
+ * @throws JMSException
+ * @see javax.jms.MessageProducer#getTimeToLive()
+ */
+ @Override
+ public long getTimeToLive() throws JMSException {
+ checkClosed();
+ return this.timeToLive;
+ }
+
+ /**
+ * @param message
+ * @throws JMSException
+ * @see javax.jms.MessageProducer#send(javax.jms.Message)
+ */
+ @Override
+ public void send(Message message) throws JMSException {
+ send(producerInfo.getDestination(), message, this.deliveryMode, this.priority, this.timeToLive);
+ }
+
+ /**
+ * @param destination
+ * @param message
+ * @throws JMSException
+ * @see javax.jms.MessageProducer#send(javax.jms.Destination,
+ * javax.jms.Message)
+ */
+ @Override
+ public void send(Destination destination, Message message) throws JMSException {
+ send(destination, message, this.deliveryMode, this.priority, this.timeToLive);
+ }
+
+ /**
+ * @param message
+ * @param deliveryMode
+ * @param priority
+ * @param timeToLive
+ * @throws JMSException
+ * @see javax.jms.MessageProducer#send(javax.jms.Message, int, int, long)
+ */
+ @Override
+ public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
+ send(producerInfo.getDestination(), message, deliveryMode, priority, timeToLive);
+ }
+
+ /**
+ * @param destination
+ * @param message
+ * @param deliveryMode
+ * @param priority
+ * @param timeToLive
+ * @throws JMSException
+ * @see javax.jms.MessageProducer#send(javax.jms.Destination,
+ * javax.jms.Message, int, int, long)
+ */
+ @Override
+ public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
+ checkClosed();
+
+ if (destination == null) {
+ throw new InvalidDestinationException("Don't understand null destinations");
+ }
+ if (!this.flexibleDestination && !destination.equals(producerInfo.getDestination())) {
+ throw new UnsupportedOperationException("This producer can only send messages to: " + producerInfo.getDestination().getName());
+ }
+
+ this.session.send(this, destination, message, deliveryMode, priority, timeToLive, disableMessageId, disableTimestamp);
+ }
+
+ /**
+ * @param deliveryMode
+ * @throws JMSException
+ * @see javax.jms.MessageProducer#setDeliveryMode(int)
+ */
+ @Override
+ public void setDeliveryMode(int deliveryMode) throws JMSException {
+ checkClosed();
+ this.deliveryMode = deliveryMode;
+ }
+
+ /**
+ * @param value
+ * @throws JMSException
+ * @see javax.jms.MessageProducer#setDisableMessageID(boolean)
+ */
+ @Override
+ public void setDisableMessageID(boolean value) throws JMSException {
+ checkClosed();
+ this.disableMessageId = value;
+ }
+
+ /**
+ * @param value
+ * @throws JMSException
+ * @see javax.jms.MessageProducer#setDisableMessageTimestamp(boolean)
+ */
+ @Override
+ public void setDisableMessageTimestamp(boolean value) throws JMSException {
+ checkClosed();
+ this.disableTimestamp = value;
+ }
+
+ /**
+ * @param defaultPriority
+ * @throws JMSException
+ * @see javax.jms.MessageProducer#setPriority(int)
+ */
+ @Override
+ public void setPriority(int defaultPriority) throws JMSException {
+ checkClosed();
+ this.priority = defaultPriority;
+ }
+
+ /**
+ * @param timeToLive
+ * @throws JMSException
+ * @see javax.jms.MessageProducer#setTimeToLive(long)
+ */
+ @Override
+ public void setTimeToLive(long timeToLive) throws JMSException {
+ checkClosed();
+ this.timeToLive = timeToLive;
+ }
+
+ /**
+ * @param destination
+ * the destination to set
+ * @throws JMSException
+ * @throws InvalidDestinationException
+ */
+ public void setDestination(Destination destination) throws JMSException {
+ if (destination == null) {
+ throw new InvalidDestinationException("Don't understand null destinations");
+ }
+ if (!this.flexibleDestination && !destination.equals(producerInfo.getDestination())) {
+ throw new UnsupportedOperationException("This producer can only send messages to: " + producerInfo.getDestination().getName());
+ }
+ producerInfo.setDestination(JmsMessageTransformation.transformDestination(session.getConnection(), destination));
+ }
+
+ /**
+ * @return the producer's assigned JmsProducerId.
+ */
+ protected JmsProducerId getProducerId() {
+ return this.producerInfo.getProducerId();
+ }
+
+ /**
+ * @return the next logical sequence for a Message sent from this Producer.
+ */
+ protected long getNextMessageSequence() {
+ return this.messageSequence.incrementAndGet();
+ }
+
+ protected void checkClosed() throws IllegalStateException {
+ if (closed.get()) {
+ throw new IllegalStateException("The MessageProducer is closed");
+ }
+ }
+
+ ////////////////////////////////////////////////////////////////////////////
+ // Connection interruption handlers.
+ ////////////////////////////////////////////////////////////////////////////
+
+ protected void onConnectionInterrupted() {
+ }
+
+ protected void onConnectionRecovery(Provider provider) throws Exception {
+ ProviderFuture request = new ProviderFuture();
+ provider.create(producerInfo, request);
+ request.sync();
+ }
+
+ protected void onConnectionRecovered(Provider provider) throws Exception {
+ }
+
+ protected void onConnectionRestored() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPrefetchPolicy.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPrefetchPolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPrefetchPolicy.java
new file mode 100644
index 0000000..c1212f2
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPrefetchPolicy.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import java.io.Serializable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Defines the prefetch message policies for different types of consumers
+ */
+public class JmsPrefetchPolicy extends Object implements Serializable {
+
+ private static final long serialVersionUID = 5298685386681646744L;
+
+ public static final int MAX_PREFETCH_SIZE = Short.MAX_VALUE;
+ public static final int DEFAULT_QUEUE_PREFETCH = 1000;
+ public static final int DEFAULT_QUEUE_BROWSER_PREFETCH = 500;
+ public static final int DEFAULT_DURABLE_TOPIC_PREFETCH = 100;
+ public static final int DEFAULT_TOPIC_PREFETCH = MAX_PREFETCH_SIZE;
+
+ private static final Logger LOG = LoggerFactory.getLogger(JmsPrefetchPolicy.class);
+
+ private int queuePrefetch;
+ private int queueBrowserPrefetch;
+ private int topicPrefetch;
+ private int durableTopicPrefetch;
+ private int maxPrefetchSize = MAX_PREFETCH_SIZE;
+
+ /**
+ * Initialize default prefetch policies
+ */
+ public JmsPrefetchPolicy() {
+ this.queuePrefetch = DEFAULT_QUEUE_PREFETCH;
+ this.queueBrowserPrefetch = DEFAULT_QUEUE_BROWSER_PREFETCH;
+ this.topicPrefetch = DEFAULT_TOPIC_PREFETCH;
+ this.durableTopicPrefetch = DEFAULT_DURABLE_TOPIC_PREFETCH;
+ }
+
+ /**
+ * Creates a new JmsPrefetchPolicy instance copied from the source policy.
+ *
+ * @param source
+ * The policy instance to copy values from.
+ */
+ public JmsPrefetchPolicy(JmsPrefetchPolicy source) {
+ this.queuePrefetch = source.getQueuePrefetch();
+ this.queueBrowserPrefetch = source.getQueueBrowserPrefetch();
+ this.topicPrefetch = source.getTopicPrefetch();
+ this.durableTopicPrefetch = source.getDurableTopicPrefetch();
+ }
+
+ /**
+ * @return Returns the durableTopicPrefetch.
+ */
+ public int getDurableTopicPrefetch() {
+ return durableTopicPrefetch;
+ }
+
+ /**
+ * Sets the durable topic prefetch value, this value is limited by the max
+ * prefetch size setting.
+ *
+ * @param durableTopicPrefetch
+ * The durableTopicPrefetch to set.
+ */
+ public void setDurableTopicPrefetch(int durableTopicPrefetch) {
+ this.durableTopicPrefetch = getMaxPrefetchLimit(durableTopicPrefetch);
+ }
+
+ /**
+ * @return Returns the queuePrefetch.
+ */
+ public int getQueuePrefetch() {
+ return queuePrefetch;
+ }
+
+ /**
+ * @param queuePrefetch
+ * The queuePrefetch to set.
+ */
+ public void setQueuePrefetch(int queuePrefetch) {
+ this.queuePrefetch = getMaxPrefetchLimit(queuePrefetch);
+ }
+
+ /**
+ * @return Returns the queueBrowserPrefetch.
+ */
+ public int getQueueBrowserPrefetch() {
+ return queueBrowserPrefetch;
+ }
+
+ /**
+ * @param queueBrowserPrefetch
+ * The queueBrowserPrefetch to set.
+ */
+ public void setQueueBrowserPrefetch(int queueBrowserPrefetch) {
+ this.queueBrowserPrefetch = getMaxPrefetchLimit(queueBrowserPrefetch);
+ }
+
+ /**
+ * @return Returns the topicPrefetch.
+ */
+ public int getTopicPrefetch() {
+ return topicPrefetch;
+ }
+
+ /**
+ * @param topicPrefetch
+ * The topicPrefetch to set.
+ */
+ public void setTopicPrefetch(int topicPrefetch) {
+ this.topicPrefetch = getMaxPrefetchLimit(topicPrefetch);
+ }
+
+ /**
+ * Gets the currently configured max prefetch size value.
+ * @return the currently configured max prefetch value.
+ */
+ public int getMaxPrefetchSize() {
+ return maxPrefetchSize;
+ }
+
+ /**
+ * Sets the maximum prefetch size value.
+ *
+ * @param maxPrefetchSize
+ * The maximum allowed value for any of the prefetch size options.
+ */
+ public void setMaxPrefetchSize(int maxPrefetchSize) {
+ this.maxPrefetchSize = maxPrefetchSize;
+ }
+
+ /**
+ * Sets the prefetch values for all options in this policy to the set limit. If the value
+ * given is larger than the max prefetch value of this policy the new limit will be capped
+ * at the max prefetch value.
+ *
+ * @param prefetch
+ * The prefetch value to apply to all prefetch limits.
+ */
+ public void setAll(int prefetch) {
+ this.durableTopicPrefetch = getMaxPrefetchLimit(prefetch);
+ this.queueBrowserPrefetch = getMaxPrefetchLimit(prefetch);
+ this.queuePrefetch = getMaxPrefetchLimit(prefetch);
+ this.topicPrefetch = getMaxPrefetchLimit(prefetch);
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (object instanceof JmsPrefetchPolicy) {
+ JmsPrefetchPolicy other = (JmsPrefetchPolicy) object;
+ return this.queuePrefetch == other.queuePrefetch && this.queueBrowserPrefetch == other.queueBrowserPrefetch
+ && this.topicPrefetch == other.topicPrefetch && this.durableTopicPrefetch == other.durableTopicPrefetch;
+ }
+ return false;
+ }
+
+ private int getMaxPrefetchLimit(int value) {
+ int result = Math.min(value, maxPrefetchSize);
+ if (result < value) {
+ LOG.warn("maximum prefetch limit has been reset from " + value + " to " + MAX_PREFETCH_SIZE);
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueue.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueue.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueue.java
new file mode 100644
index 0000000..d9e397c
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueue.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.Queue;
+
+/**
+ * JMS Queue implementation
+ */
+public class JmsQueue extends JmsDestination implements Queue {
+
+ public JmsQueue() {
+ super(null, false, false);
+ }
+
+ public JmsQueue(String name) {
+ super(name, false, false);
+ }
+
+ @Override
+ public JmsQueue copy() {
+ final JmsQueue copy = new JmsQueue();
+ copy.setProperties(getProperties());
+ return copy;
+ }
+
+ /**
+ * @return name
+ * @see javax.jms.Queue#getQueueName()
+ */
+ @Override
+ public String getQueueName() {
+ return getName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueBrowser.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueBrowser.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueBrowser.java
new file mode 100644
index 0000000..ce20d42
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueBrowser.java
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import java.util.Enumeration;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A client uses a <CODE>QueueBrowser</CODE> object to look at messages on a queue without
+ * removing them.
+ * <p/>
+ * <p/>
+ * The <CODE>getEnumeration</CODE> method returns a <CODE>
+ * java.util.Enumeration</CODE> that is used to scan the queue's messages. It may be an
+ * enumeration of the entire content of a queue, or it may contain only the messages matching a
+ * message selector.
+ * <p/>
+ * <p/>
+ * Messages may be arriving and expiring while the scan is done. The JMS API does not require
+ * the content of an enumeration to be a static snapshot of queue content. Whether these changes
+ * are visible or not depends on the JMS provider.
+ * <p/>
+ * <p/>
+ * A <CODE>QueueBrowser</CODE> can be created from either a <CODE>Session
+ * </CODE> or a <CODE>QueueSession</CODE>.
+ *
+ * @see javax.jms.Session#createBrowser
+ * @see javax.jms.QueueSession#createBrowser
+ * @see javax.jms.QueueBrowser
+ * @see javax.jms.QueueReceiver
+ */
+public class JmsQueueBrowser implements QueueBrowser, Enumeration<Message> {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(JmsQueueBrowser.class);
+
+ private final JmsSession session;
+ private final JmsDestination destination;
+ private final String selector;
+
+ private JmsMessageConsumer consumer;
+ private final AtomicBoolean browseDone = new AtomicBoolean(false);
+
+ private Message next;
+ private final AtomicBoolean closed = new AtomicBoolean();
+ private final Object semaphore = new Object();
+
+ /**
+ * Constructor for an JmsQueueBrowser - used internally
+ *
+ * @param session
+ * @param id
+ * @param destination
+ * @param selector
+ * @throws javax.jms.JMSException
+ */
+ protected JmsQueueBrowser(JmsSession session, JmsDestination destination, String selector) throws JMSException {
+ this.session = session;
+ this.destination = destination;
+ this.selector = selector;
+ }
+
+ private void destroyConsumer() {
+ if (consumer == null) {
+ return;
+ }
+ try {
+ if (session.getTransacted()) {
+ session.commit();
+ }
+ consumer.close();
+ consumer = null;
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Gets an enumeration for browsing the current queue messages in the order they would be
+ * received.
+ *
+ * @return an enumeration for browsing the messages
+ * @throws javax.jms.JMSException
+ * if the JMS provider fails to get the enumeration for this browser due to some
+ * internal error.
+ */
+ @Override
+ public Enumeration<Message> getEnumeration() throws JMSException {
+ checkClosed();
+ if (consumer == null) {
+ consumer = createConsumer();
+ }
+ return this;
+ }
+
+ private void checkClosed() throws IllegalStateException {
+ if (closed.get()) {
+ throw new IllegalStateException("The Consumer is closed");
+ }
+ }
+
+ /**
+ * @return true if more messages to process
+ */
+ @Override
+ public boolean hasMoreElements() {
+ while (true) {
+ synchronized (this) {
+ if (consumer == null) {
+ return false;
+ }
+ }
+
+ if (next == null) {
+ try {
+ next = consumer.receiveNoWait();
+ } catch (JMSException e) {
+ LOG.warn("Error while receive the next message: {}", e.getMessage());
+ // TODO - Add client internal error listener.
+ // this.session.connection.onClientInternalException(e);
+ }
+
+ if (next != null) {
+ return true;
+ }
+ } else {
+ return true;
+ }
+
+ if (browseDone.get() || !session.isStarted()) {
+ destroyConsumer();
+ return false;
+ }
+
+ waitForMessage();
+ }
+ }
+
+ /**
+ * @return the next message if one exists
+ *
+ * @throws NoSuchElementException if no more elements are available.
+ */
+ @Override
+ public Message nextElement() {
+ synchronized (this) {
+ if (consumer == null) {
+ return null;
+ }
+ }
+
+ if (hasMoreElements()) {
+ Message message = next;
+ next = null;
+ return message;
+ }
+
+ if (browseDone.get() || !session.isStarted()) {
+ destroyConsumer();
+ return null;
+ }
+
+ throw new NoSuchElementException();
+ }
+
+ @Override
+ public void close() throws JMSException {
+ if (closed.compareAndSet(false, true)) {
+ browseDone.set(true);
+ destroyConsumer();
+ }
+ }
+
+ /**
+ * Gets the queue associated with this queue browser.
+ *
+ * @return the queue
+ * @throws javax.jms.JMSException
+ * if the JMS provider fails to get the queue associated with this browser due to
+ * some internal error.
+ */
+
+ @Override
+ public Queue getQueue() throws JMSException {
+ return (Queue) destination;
+ }
+
+ @Override
+ public String getMessageSelector() throws JMSException {
+ return selector;
+ }
+
+ /**
+ * Wait on a semaphore for a fixed amount of time for a message to come in.
+ */
+ protected void waitForMessage() {
+ try {
+ synchronized (semaphore) {
+ semaphore.wait(2000);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ protected void notifyMessageAvailable() {
+ synchronized (semaphore) {
+ semaphore.notifyAll();
+ }
+ }
+
+ @Override
+ public String toString() {
+ JmsMessageConsumer consumer = this.consumer;
+ return "JmsQueueBrowser { value=" + (consumer != null ? consumer.getConsumerId() : "null") + " }";
+ }
+
+ private JmsMessageConsumer createConsumer() throws JMSException {
+ browseDone.set(false);
+ JmsMessageConsumer rc = new JmsMessageConsumer(session.getNextConsumerId(), session, destination, selector, false) {
+
+ @Override
+ public boolean isBrowser() {
+ return true;
+ }
+
+ @Override
+ public void onMessage(JmsInboundMessageDispatch envelope) {
+ if (envelope.getMessage() == null) {
+ browseDone.set(true);
+ } else {
+ super.onMessage(envelope);
+ }
+ notifyMessageAvailable();
+ }
+ };
+ rc.init();
+ return rc;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueConnection.java
new file mode 100644
index 0000000..39eadeb
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueConnection.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.ConnectionConsumer;
+import javax.jms.JMSException;
+import javax.jms.ServerSessionPool;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+
+import org.apache.qpid.jms.provider.Provider;
+import org.apache.qpid.jms.util.IdGenerator;
+
+public class JmsQueueConnection extends JmsConnection {
+
+ public JmsQueueConnection(String connectionId, Provider provider, IdGenerator clientIdGenerator) throws JMSException {
+ super(connectionId, provider, clientIdGenerator);
+ }
+
+ @Override
+ public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
+ throw new javax.jms.IllegalStateException("Operation not supported by a QueueConnection");
+ }
+
+ @Override
+ public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+ throw new javax.jms.IllegalStateException("Operation not supported by a QueueConnection");
+ }
+
+ @Override
+ public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+ throw new javax.jms.IllegalStateException("Operation not supported by a QueueConnection");
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueReceiver.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueReceiver.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueReceiver.java
new file mode 100644
index 0000000..aa1da65
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueReceiver.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueReceiver;
+
+import org.apache.qpid.jms.meta.JmsConsumerId;
+
+/**
+ * Implementation of a JMS QueueReceiver
+ */
+public class JmsQueueReceiver extends JmsMessageConsumer implements QueueReceiver {
+
+ /**
+ * Constructor
+ *
+ * @param id
+ * This receiver's assigned Id.
+ * @param session
+ * The session that created this receiver.
+ * @param dest
+ * The destination that this receiver listens on.
+ * @param selector
+ * The selector used to filter messages for this receiver.
+ *
+ * @throws JMSException
+ */
+ protected JmsQueueReceiver(JmsConsumerId id, JmsSession session, JmsDestination dest, String selector) throws JMSException {
+ super(id, session, dest, selector, false);
+ }
+
+ /**
+ * @return the Queue
+ * @throws IllegalStateException
+ * @see javax.jms.QueueReceiver#getQueue()
+ */
+ @Override
+ public Queue getQueue() throws IllegalStateException {
+ checkClosed();
+ return (Queue) this.getDestination();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueSender.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueSender.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueSender.java
new file mode 100644
index 0000000..c2276c8
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueSender.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.QueueSender;
+
+import org.apache.qpid.jms.meta.JmsProducerId;
+
+/**
+ * Implementation of a Queue Sender
+ */
+public class JmsQueueSender extends JmsMessageProducer implements QueueSender {
+
+ /**
+ * Constructor
+ *
+ * @param id
+ * @param session
+ * @param destination
+ */
+ protected JmsQueueSender(JmsProducerId id, JmsSession session, JmsDestination destination) throws JMSException {
+ super(id, session, destination);
+ }
+
+ /**
+ * @return the Queue
+ * @throws IllegalStateException
+ * @see javax.jms.QueueSender#getQueue()
+ */
+ @Override
+ public Queue getQueue() throws IllegalStateException {
+ checkClosed();
+ return (Queue) this.producerInfo.getDestination();
+ }
+
+ /**
+ * @param queue
+ * @param message
+ * @throws JMSException
+ * @see javax.jms.QueueSender#send(javax.jms.Queue, javax.jms.Message)
+ */
+ @Override
+ public void send(Queue queue, Message message) throws JMSException {
+ super.send(queue, message);
+ }
+
+ /**
+ * @param queue
+ * @param message
+ * @param deliveryMode
+ * @param priority
+ * @param timeToLive
+ * @throws JMSException
+ * @see javax.jms.QueueSender#send(javax.jms.Queue, javax.jms.Message, int, int, long)
+ */
+ @Override
+ public void send(Queue queue, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
+ super.send(message, deliveryMode, priority, timeToLive);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueSession.java
new file mode 100644
index 0000000..274c0a7
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueSession.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.TemporaryTopic;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSubscriber;
+
+import org.apache.qpid.jms.meta.JmsSessionId;
+
+/**
+ * JMS QueueSession implementation
+ */
+public class JmsQueueSession extends JmsSession {
+
+ protected JmsQueueSession(JmsConnection connection, JmsSessionId sessionId, int acknowledgementMode) throws JMSException {
+ super(connection, sessionId, acknowledgementMode);
+ }
+
+ @Override
+ public MessageConsumer createConsumer(Destination destination) throws JMSException {
+ if (destination instanceof Topic) {
+ throw new IllegalStateException("Operation not supported by a QueueSession");
+ }
+ return super.createConsumer(destination);
+ }
+
+ /**
+ * @param destination
+ * @param messageSelector
+ * @return
+ * @throws JMSException
+ * @see javax.jms.Session#createConsumer(javax.jms.Destination,
+ * java.lang.String)
+ */
+ @Override
+ public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
+ if (destination instanceof Topic) {
+ throw new IllegalStateException("Operation not supported by a QueueSession");
+ }
+ return super.createConsumer(destination, messageSelector);
+ }
+
+ /**
+ * @param destination
+ * @param messageSelector
+ * @param NoLocal
+ * @return
+ * @throws JMSException
+ * @see javax.jms.Session#createConsumer(javax.jms.Destination,
+ * java.lang.String, boolean)
+ */
+ @Override
+ public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal) throws JMSException {
+ throw new IllegalStateException("Operation not supported by a QueueSession");
+ }
+
+ /**
+ * @param topic
+ * @param name
+ * @return
+ * @throws JMSException
+ * @see javax.jms.Session#createDurableSubscriber(javax.jms.Topic,
+ * java.lang.String)
+ */
+ @Override
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
+ throw new IllegalStateException("Operation not supported by a QueueSession");
+ }
+
+ /**
+ * @param topic
+ * @param name
+ * @param messageSelector
+ * @param noLocal
+ * @return
+ * @throws IllegalStateException
+ * @throws JMSException
+ * @see javax.jms.Session#createDurableSubscriber(javax.jms.Topic,
+ * java.lang.String, java.lang.String, boolean)
+ */
+ @Override
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws IllegalStateException {
+ throw new IllegalStateException("Operation not supported by a QueueSession");
+ }
+
+ /**
+ * @param destination
+ * @return
+ * @throws JMSException
+ * @see javax.jms.Session#createProducer(javax.jms.Destination)
+ */
+ @Override
+ public MessageProducer createProducer(Destination destination) throws JMSException {
+ if (destination instanceof Topic) {
+ throw new IllegalStateException("Operation not supported by a QueueSession");
+ }
+ return super.createProducer(destination);
+ }
+
+ /**
+ * @return
+ * @throws JMSException
+ * @see javax.jms.Session#createTemporaryTopic()
+ */
+ @Override
+ public TemporaryTopic createTemporaryTopic() throws JMSException {
+ throw new IllegalStateException("Operation not supported by a QueueSession");
+ }
+
+ /**
+ * @param topicName
+ * @return
+ * @throws JMSException
+ * @see javax.jms.Session#createTopic(java.lang.String)
+ */
+ @Override
+ public Topic createTopic(String topicName) throws JMSException {
+ throw new IllegalStateException("Operation not supported by a QueueSession");
+ }
+
+ /**
+ * @param name
+ * @throws JMSException
+ * @see javax.jms.Session#unsubscribe(java.lang.String)
+ */
+ @Override
+ public void unsubscribe(String name) throws JMSException {
+ throw new IllegalStateException("Operation not supported by a QueueSession");
+ }
+
+ /**
+ * @param topic
+ * @return
+ * @throws JMSException
+ * @see javax.jms.TopicSession#createPublisher(javax.jms.Topic)
+ */
+ @Override
+ public TopicPublisher createPublisher(Topic topic) throws JMSException {
+ throw new IllegalStateException("Operation not supported by a QueueSession");
+ }
+
+ /**
+ * @param topic
+ * @return
+ * @throws JMSException
+ * @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic)
+ */
+ @Override
+ public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
+ throw new IllegalStateException("Operation not supported by a QueueSession");
+ }
+
+ /**
+ * @param topic
+ * @param messageSelector
+ * @param noLocal
+ * @return
+ * @throws JMSException
+ * @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic,
+ * java.lang.String, boolean)
+ */
+ @Override
+ public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
+ throw new IllegalStateException("Operation not supported by a QueueSession");
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org