You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/09/19 13:36:26 UTC
svn commit: r577253 [3/7] - in
/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity:
nclient/ nclient/impl/ nclient/util/ njms/ njms/message/
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,662 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.nclient.MessagePartListener;
+import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
+import org.apache.qpidity.exchange.ExchangeDefaults;
+import org.apache.qpidity.filter.JMSSelectorFilter;
+import org.apache.qpidity.filter.MessageFilter;
+import org.apache.qpidity.njms.message.MessageFactory;
+import org.apache.qpidity.njms.message.QpidMessage;
+import org.apache.qpidity.transport.Option;
+import org.apache.qpidity.transport.RangeSet;
+
+/**
+ * Implementation of JMS message consumer
+ */
+public class MessageConsumerImpl extends MessageActor
+ implements MessageConsumer, org.apache.qpidity.nclient.util.MessageListener
+{
+ // we can receive up to 100 messages for an asynchronous listener
+ public static final int MAX_MESSAGE_TRANSFERRED = 100;
+
+ /**
+ * This MessageConsumer's messageselector.
+ */
+ private String _messageSelector = null;
+
+ /**
+ * The message selector filter associated with this consumer message selector
+ */
+ private MessageFilter _filter = null;
+
+ /**
+ * NoLocal
+ * If true, and the destination is a topic then inhibits the delivery of messages published
+ * by its own connection. The behavior for NoLocal is not specified if the destination is a queue.
+ */
+ protected boolean _noLocal;
+
+ /**
+ * The subscription name
+ */
+ protected String _subscriptionName;
+
+ /**
+ * Indicates whether this consumer receives pre-acquired messages
+ */
+ private boolean _preAcquire = true;
+
+ /**
+ * A MessagePartListener set up for this consumer.
+ */
+ private MessageListener _messageListener;
+
+ /**
+ * A lcok on the syncrhonous message
+ */
+ private final Object _incomingMessageLock = new Object();
+
+
+ /**
+ * Number of mesages received asynchronously
+ * Nether exceed MAX_MESSAGE_TRANSFERRED
+ */
+ private int _messageAsyncrhonouslyReceived = 0;
+
+ private LinkedBlockingQueue<QpidMessage> _queue = new LinkedBlockingQueue<QpidMessage>();
+
+ //----- Constructors
+ /**
+ * Create a new MessageProducerImpl.
+ *
+ * @param session The session from which the MessageProducerImpl is instantiated
+ * @param destination The default destination for this MessageProducerImpl
+ * @param messageSelector The message selector for this QueueReceiverImpl.
+ * @param noLocal If true inhibits the delivery of messages published by its own connection.
+ * @param subscriptionName Name of the subscription if this is to be created as a durable subscriber.
+ * If this value is null, a non-durable subscription is created.
+ * @param consumerTag Thi actor ID
+ * @throws Exception If the MessageProducerImpl cannot be created due to some internal error.
+ */
+ protected MessageConsumerImpl(SessionImpl session, DestinationImpl destination, String messageSelector,
+ boolean noLocal, String subscriptionName, String consumerTag) throws Exception
+ {
+ super(session, destination, consumerTag);
+ if (messageSelector != null)
+ {
+ _messageSelector = messageSelector;
+ _filter = new JMSSelectorFilter(messageSelector);
+ }
+ _noLocal = noLocal;
+ _subscriptionName = subscriptionName;
+ _isStopped = getSession().isStopped();
+ // let's create a message part assembler
+
+ MessagePartListener messageAssembler = new MessagePartListenerAdapter(this);
+
+ if (destination instanceof Queue)
+ {
+ // this is a queue we expect that this queue exists
+ getSession().getQpidSession()
+ .messageSubscribe(destination.getQpidQueueName(), // queue
+ getMessageActorID(), // destination
+ org.apache.qpidity.nclient.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
+ // When the message selctor is set we do not acquire the messages
+ _messageSelector != null ? org.apache.qpidity.nclient.Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE : org.apache.qpidity.nclient.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
+ messageAssembler, null, _noLocal ? Option.NO_LOCAL : Option.NO_OPTION);
+ if (_messageSelector != null)
+ {
+ _preAcquire = false;
+ }
+ }
+ else
+ {
+ // this is a topic we need to create a temporary queue for this consumer
+ // unless this is a durable subscriber
+ String queueName;
+ if (subscriptionName != null)
+ {
+ // this ia a durable subscriber
+ // create a persistent queue for this subscriber
+ queueName = "topic-" + subscriptionName;
+ getSession().getQpidSession()
+ .queueDeclare(queueName, null, null, Option.EXCLUSIVE, Option.DURABLE);
+ }
+ else
+ {
+ // this is a non durable subscriber
+ queueName = destination.getQpidQueueName();
+ getSession().getQpidSession()
+ .queueDeclare(queueName, null, null, Option.AUTO_DELETE, Option.EXCLUSIVE);
+ }
+ // bind this queue with the topic exchange
+ getSession().getQpidSession()
+ .queueBind(queueName, ExchangeDefaults.TOPIC_EXCHANGE_NAME, destination.getRoutingKey(), null);
+ // subscribe to this topic
+ getSession().getQpidSession()
+ .messageSubscribe(queueName, getMessageActorID(),
+ org.apache.qpidity.nclient.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
+ // We always acquire the messages
+ org.apache.qpidity.nclient.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
+ messageAssembler, null, _noLocal ? Option.NO_LOCAL : Option.NO_OPTION,
+ // Request exclusive subscription access, meaning only this subscription
+ // can access the queue.
+ Option.EXCLUSIVE);
+
+ }
+ // set the flow mode
+ getSession().getQpidSession()
+ .messageFlowMode(getMessageActorID(), org.apache.qpidity.nclient.Session.MESSAGE_FLOW_MODE_CREDIT);
+
+ // this will prevent the broker from sending more than one message
+ // When a messageListener is set the flow will be adjusted.
+ // until then we assume it's for synchronous message consumption
+ requestCredit(1);
+ requestSync();
+ // check for an exception
+ if (getSession().getCurrentException() != null)
+ {
+ throw getSession().getCurrentException();
+ }
+ }
+
+ //----- Message consumer API
+ /**
+ * Gets this MessageConsumer's message selector.
+ *
+ * @return This MessageConsumer's message selector, or null if no
+ * message selector exists for the message consumer (that is, if
+ * the message selector was not set or was set to null or the
+ * empty string)
+ * @throws JMSException if getting the message selector fails due to some internal error.
+ */
+ public String getMessageSelector() throws JMSException
+ {
+ checkNotClosed();
+ return _messageSelector;
+ }
+
+ /**
+ * Gets this MessageConsumer's <CODE>MessagePartListener</CODE>.
+ *
+ * @return The listener for the MessageConsumer, or null if no listener is set
+ * @throws JMSException if getting the message listener fails due to some internal error.
+ */
+ public MessageListener getMessageListener() throws JMSException
+ {
+ checkNotClosed();
+ return _messageListener;
+ }
+
+ /**
+ * Sets the MessageConsumer's <CODE>MessagePartListener</CODE>.
+ * <p> The JMS specification says:
+ * <P>Setting the message listener to null is the equivalent of
+ * unsetting the message listener for the message consumer.
+ * <P>The effect of calling <CODE>MessageConsumer.setMessageListener</CODE>
+ * while messages are being consumed by an existing listener
+ * or the consumer is being used to consume messages synchronously
+ * is undefined.
+ *
+ * @param messageListener The listener to which the messages are to be delivered
+ * @throws JMSException If setting the message listener fails due to some internal error.
+ */
+ public synchronized void setMessageListener(MessageListener messageListener) throws JMSException
+ {
+ // this method is synchronized as onMessage also access _messagelistener
+ // onMessage, getMessageListener and this method are the only synchronized methods
+ checkNotClosed();
+ try
+ {
+ _messageListener = messageListener;
+ if (messageListener != null)
+ {
+ resetAsynchMessageReceived();
+ }
+ }
+ catch (Exception e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+
+ /**
+ * Contact the broker and ask for the delivery of MAX_MESSAGE_TRANSFERRED messages
+ *
+ * @throws QpidException If there is a communication error
+ */
+ private void resetAsynchMessageReceived() throws QpidException
+ {
+ if (!_isStopped && _messageAsyncrhonouslyReceived >= MAX_MESSAGE_TRANSFERRED)
+ {
+ getSession().getQpidSession().messageStop(getMessageActorID());
+ }
+ _messageAsyncrhonouslyReceived = 0;
+ requestCredit(MAX_MESSAGE_TRANSFERRED);
+ }
+
+ /**
+ * Receive the next message produced for this message consumer.
+ * <P>This call blocks indefinitely until a message is produced or until this message consumer is closed.
+ *
+ * @return The next message produced for this message consumer, or
+ * null if this message consumer is concurrently closed
+ * @throws JMSException If receiving the next message fails due to some internal error.
+ */
+ public Message receive() throws JMSException
+ {
+ // Check if we can get a message immediately
+ Message result;
+ result = receiveNoWait();
+ if (result != null)
+ {
+ return result;
+ }
+ try
+ {
+ // Now issue a credit and wait for the broker to send a message
+ // IMO no point doing a credit() flush() and sync() in a loop.
+ // This will only overload the broker. After the initial try we can wait
+ // for the broker to send a message when it gets one
+ requestCredit(1);
+ return (Message) _queue.take();
+ }
+ catch (Exception e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+
+ /**
+ * Receive the next message that arrives within the specified timeout interval.
+ * <p> This call blocks until a message arrives, the timeout expires, or this message consumer
+ * is closed.
+ * <p> A timeout of zero never expires, and the call blocks indefinitely.
+ * <p> A timeout less than 0 throws a JMSException.
+ *
+ * @param timeout The timeout value (in milliseconds)
+ * @return The next message that arrives within the specified timeout interval.
+ * @throws JMSException If receiving the next message fails due to some internal error.
+ */
+ public Message receive(long timeout) throws JMSException
+ {
+ checkClosed();
+ checkIfListenerSet();
+ if (timeout < 0)
+ {
+ throw new JMSException("Invalid timeout value: " + timeout);
+ }
+
+ Message result;
+ try
+ {
+ // first check if we have any in the queue already
+ result = (Message) _queue.poll();
+ if (result == null)
+ {
+ requestCredit(1);
+ requestFlush();
+ // We shouldn't do a sync(). Bcos the timeout can happen
+ // before the sync() returns
+ return (Message) _queue.poll(timeout, TimeUnit.MILLISECONDS);
+ }
+ else
+ {
+ return result;
+ }
+ }
+ catch (Exception e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+
+ /**
+ * Receive the next message if one is immediately available.
+ *
+ * @return the next message or null if one is not available.
+ * @throws JMSException If receiving the next message fails due to some internal error.
+ */
+ public Message receiveNoWait() throws JMSException
+ {
+ checkClosed();
+ checkIfListenerSet();
+ Message result;
+ try
+ {
+ // first check if we have any in the queue already
+ result = (Message) _queue.poll();
+ if (result == null)
+ {
+ requestCredit(1);
+ requestFlush();
+ requestSync();
+ return (Message) _queue.poll();
+ }
+ else
+ {
+ return result;
+ }
+ }
+ catch (Exception e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+
+ // not public methods
+ /**
+ * Upon receipt of this method, the broker adds "value"
+ * number of messages to the available credit balance for this consumer.
+ *
+ * @param value Number of credits, a value of 0 indicates an infinite amount of credit.
+ */
+ private void requestCredit(int value)
+ {
+ getSession().getQpidSession()
+ .messageFlow(getMessageActorID(), org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, value);
+ }
+
+ /**
+ * Forces the broker to exhaust its credit supply.
+ * <p> The broker's credit will always be zero when
+ * this method completes.
+ */
+ private void requestFlush()
+ {
+ getSession().getQpidSession().messageFlush(getMessageActorID());
+ }
+
+ /**
+ * Sync method will block until all outstanding broker
+ * commands
+ * are executed.
+ */
+ private void requestSync()
+ {
+ getSession().getQpidSession().sync();
+ }
+
+ /**
+ * Check whether this consumer is closed.
+ *
+ * @throws JMSException If this consumer is closed.
+ */
+ private void checkClosed() throws JMSException
+ {
+ if (_isStopped)
+ {
+ throw new JMSException("Session is closed");
+ }
+ }
+
+ /**
+ * Stop the delivery of messages to this consumer.
+ * <p>For asynchronous receiver, this operation blocks until the message listener
+ * finishes processing the current message,
+ *
+ * @throws Exception If the consumer cannot be stopped due to some internal error.
+ */
+ protected void stop() throws Exception
+ {
+ getSession().getQpidSession().messageStop(getMessageActorID());
+ _isStopped = true;
+ }
+
+ /**
+ * Start the delivery of messages to this consumer.
+ *
+ * @throws Exception If the consumer cannot be started due to some internal error.
+ */
+ protected void start() throws Exception
+ {
+ synchronized (_incomingMessageLock)
+ {
+ _isStopped = false;
+ }
+ }
+
+ /**
+ * This method notifies this consumer that a message has been delivered
+ * @param message The received message.
+ */
+ public void onMessage(org.apache.qpidity.api.Message message)
+ {
+ try
+ {
+ QpidMessage jmsMessage = MessageFactory.getQpidMessage(message);
+ if (checkPreConditions(jmsMessage))
+ {
+ preApplicationProcessing(jmsMessage);
+
+ if (_messageListener == null)
+ {
+ _queue.offer(jmsMessage);
+ }
+ else
+ {
+ // I still think we don't need that additional thread in SessionImpl
+ // if the Application blocks on a message thats fine
+ // getSession().dispatchMessage(getMessageActorID(), jmsMessage);
+ notifyMessageListener(jmsMessage);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+
+ public void notifyMessageListener(QpidMessage message) throws RuntimeException
+ {
+ try
+ {
+ _messageAsyncrhonouslyReceived++;
+ if (_messageAsyncrhonouslyReceived >= MAX_MESSAGE_TRANSFERRED)
+ {
+ // ask the server for the delivery of MAX_MESSAGE_TRANSFERRED more messages
+ resetAsynchMessageReceived();
+ }
+
+ // The JMS specs says:
+ /* The result of a listener throwing a RuntimeException depends on the session?s
+ * acknowledgment mode.
+ ? --- AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE - the message
+ * will be immediately redelivered. The number of times a JMS provider will
+ * redeliver the same message before giving up is provider-dependent.
+ ? --- CLIENT_ACKNOWLEDGE - the next message for the listener is delivered.
+ * --- Transacted Session - the next message for the listener is delivered.
+ *
+ * The number of time we try redelivering the message is 0
+ **/
+ try
+ {
+
+ _messageListener.onMessage((Message) message);
+ }
+ catch (RuntimeException re)
+ {
+ // do nothing as this message will not be redelivered
+ }
+
+
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+ /**
+ * Check whether this consumer is asynchronous
+ *
+ * @throws javax.jms.IllegalStateException If this consumer is asynchronous.
+ */
+ private void checkIfListenerSet() throws javax.jms.IllegalStateException
+ {
+
+ if (_messageListener != null)
+ {
+ throw new javax.jms.IllegalStateException("A listener has already been set.");
+ }
+ }
+
+ /**
+ * pre process a received message.
+ *
+ * @param message The message to pre-process.
+ * @throws Exception If the message cannot be pre-processed due to some internal error.
+ */
+ private void preApplicationProcessing(QpidMessage message) throws Exception
+ {
+ getSession().preProcessMessage(message);
+ // If the session is transacted we need to ack the message first
+ // This is because a message is associated with its tx only when acked
+ if (getSession().getTransacted())
+ {
+ getSession().acknowledgeMessage(message);
+ }
+ message.afterMessageReceive();
+ }
+
+ /**
+ * Check whether a message can be delivered to this consumer.
+ *
+ * @param message The message to be checked.
+ * @return true if the message matches the selector and can be acquired, false otherwise.
+ * @throws QpidException If the message preConditions cannot be checked due to some internal error.
+ */
+ private boolean checkPreConditions(QpidMessage message) throws QpidException
+ {
+ boolean messageOk = true;
+ if (_messageSelector != null)
+ {
+ messageOk = _filter.matches((Message) message);
+ }
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("messageOk " + messageOk);
+ _logger.debug("_preAcquire " + _preAcquire);
+ }
+ if (!messageOk && _preAcquire)
+ {
+ // this is the case for topics
+ // We need to ack this message
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("filterMessage - trying to ack message");
+ }
+ acknowledgeMessage(message);
+ }
+ else if (!messageOk)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Message not OK, releasing");
+ }
+ releaseMessage(message);
+ }
+ // now we need to acquire this message if needed
+ // this is the case of queue with a message selector set
+ if (!_preAcquire && messageOk)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("filterMessage - trying to acquire message");
+ }
+ messageOk = acquireMessage(message);
+ }
+ return messageOk;
+ }
+
+ /**
+ * Release a message
+ *
+ * @param message The message to be released
+ * @throws QpidException If the message cannot be released due to some internal error.
+ */
+ private void releaseMessage(QpidMessage message) throws QpidException
+ {
+ if (_preAcquire)
+ {
+ RangeSet ranges = new RangeSet();
+ ranges.add(message.getMessageTransferId());
+ getSession().getQpidSession().messageRelease(ranges);
+ getSession().testQpidException();
+ }
+ }
+
+ /**
+ * Acquire a message
+ *
+ * @param message The message to be acquired
+ * @return true if the message has been acquired, false otherwise.
+ * @throws QpidException If the message cannot be acquired due to some internal error.
+ */
+ private boolean acquireMessage(QpidMessage message) throws QpidException
+ {
+ boolean result = false;
+ if (!_preAcquire)
+ {
+ RangeSet ranges = new RangeSet();
+ ranges.add(message.getMessageTransferId());
+
+ getSession().getQpidSession()
+ .messageAcquire(ranges, org.apache.qpidity.nclient.Session.MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE);
+ getSession().getQpidSession().sync();
+ RangeSet acquired = getSession().getQpidSession().getAccquiredMessages();
+ if (acquired.size() > 0)
+ {
+ result = true;
+ }
+ getSession().testQpidException();
+ }
+ return result;
+ }
+
+ /**
+ * Acknowledge a message
+ *
+ * @param message The message to be acknowledged
+ * @throws QpidException If the message cannot be acquired due to some internal error.
+ */
+ private void acknowledgeMessage(QpidMessage message) throws QpidException
+ {
+ if (!_preAcquire)
+ {
+ RangeSet ranges = new RangeSet();
+ ranges.add(message.getMessageTransferId());
+ getSession().getQpidSession().messageAcknowledge(ranges);
+ getSession().testQpidException();
+ }
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageProducerImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageProducerImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageProducerImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageProducerImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,384 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import org.apache.qpidity.njms.message.MessageHelper;
+import org.apache.qpidity.njms.message.MessageImpl;
+import org.apache.qpidity.QpidException;
+
+import javax.jms.*;
+import java.util.UUID;
+import java.io.IOException;
+
+/**
+ * Implements MessageProducer
+ */
+public class MessageProducerImpl extends MessageActor implements MessageProducer
+{
+ /**
+ * If true, messages will not get a timestamp.
+ */
+ private boolean _disableTimestamps = false;
+
+ /**
+ * Priority of messages created by this producer.
+ */
+ private int _messagePriority = Message.DEFAULT_PRIORITY;
+
+ /**
+ * Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
+ */
+ private long _timeToLive;
+
+ /**
+ * Delivery mode used for this producer.
+ */
+ private int _deliveryMode = DeliveryMode.PERSISTENT;
+
+ /**
+ * Speicify whether the messageID is disable
+ */
+ private boolean _disableMessageId = false;
+
+ //-- constructors
+ public MessageProducerImpl(SessionImpl session, DestinationImpl destination)
+ {
+ super(session, destination,"");
+ }
+
+ //--- Interface javax.njms.MessageProducer
+ /**
+ * Sets whether message IDs are disabled.
+ *
+ * @param value Specify whether the MessageID must be disabled
+ * @throws JMSException If disabling messageID fails due to some internal error.
+ */
+ public void setDisableMessageID(boolean value) throws JMSException
+ {
+ checkNotClosed();
+ _disableMessageId = value;
+ }
+
+ /**
+ * Gets an indication of whether message IDs are disabled.
+ *
+ * @return true is messageID is disabled, false otherwise
+ * @throws JMSException If getting whether messagID is disabled fails due to some internal error.
+ */
+ public boolean getDisableMessageID() throws JMSException
+ {
+ checkNotClosed();
+ return _disableMessageId;
+ }
+
+ /**
+ * Sets whether message timestamps are disabled.
+ * <P> JMS spec says:
+ * <p> Since timestamps take some effort to create and increase a
+ * message's size, some JMS providers may be able to optimize message
+ * overhead if they are given a hint that the timestamp is not used by an
+ * application....
+ * these messages must have the timestamp set to zero; if the provider
+ * ignores the hint, the timestamp must be set to its normal value.
+ * <p>Message timestamps are enabled by default.
+ *
+ * @param value Indicates if message timestamps are disabled
+ * @throws JMSException if disabling the timestamps fails due to some internal error.
+ */
+ public void setDisableMessageTimestamp(boolean value) throws JMSException
+ {
+ checkNotClosed();
+ _disableTimestamps = value;
+ }
+
+ /**
+ * Gets an indication of whether message timestamps are disabled.
+ *
+ * @return an indication of whether message timestamps are disabled
+ * @throws JMSException if getting whether timestamps are disabled fails due to some internal error.
+ */
+ public boolean getDisableMessageTimestamp() throws JMSException
+ {
+ checkNotClosed();
+ return _disableTimestamps;
+ }
+
+ /**
+ * Sets the producer's default delivery mode.
+ * <p> JMS specification says:
+ * <p>Delivery mode is set to {@link DeliveryMode#PERSISTENT} by default.
+ *
+ * @param deliveryMode The message delivery mode for this message producer; legal
+ * values are {@link DeliveryMode#NON_PERSISTENT}
+ * and {@link DeliveryMode#PERSISTENT}.
+ * @throws JMSException if setting the delivery mode fails due to some internal error.
+ */
+ public void setDeliveryMode(int deliveryMode) throws JMSException
+ {
+ checkNotClosed();
+ if ((deliveryMode != DeliveryMode.NON_PERSISTENT) && (deliveryMode != DeliveryMode.PERSISTENT))
+ {
+ throw new JMSException(
+ "DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + deliveryMode + " is illegal");
+ }
+ _deliveryMode = deliveryMode;
+ }
+
+ /**
+ * Gets the producer's delivery mode.
+ *
+ * @return The message delivery mode for this message producer
+ * @throws JMSException If getting the delivery mode fails due to some internal error.
+ */
+ public int getDeliveryMode() throws JMSException
+ {
+ checkNotClosed();
+ return _deliveryMode;
+ }
+
+ /**
+ * Sets the producer's message priority.
+ * <p> The njms spec says:
+ * <p> The JMS API defines ten levels of priority value, with 0 as the
+ * lowest priority and 9 as the highest. Clients should consider priorities
+ * 0-4 as gradations of normal priority and priorities 5-9 as gradations
+ * of expedited priority.
+ * <p> Priority is set to 4 by default.
+ *
+ * @param priority The message priority for this message producer; must be a value between 0 and 9
+ * @throws JMSException if setting this producer priority fails due to some internal error.
+ */
+ public void setPriority(int priority) throws JMSException
+ {
+ checkNotClosed();
+ if ((priority < 0) || (priority > 9))
+ {
+ throw new IllegalArgumentException(
+ "Priority of " + priority + " is illegal. Value must be in range 0 to 9");
+ }
+ _messagePriority = priority;
+ }
+
+ /**
+ * Gets the producer's message priority.
+ *
+ * @return The message priority for this message producer.
+ * @throws JMSException If getting this producer message priority fails due to some internal error.
+ */
+ public int getPriority() throws JMSException
+ {
+ checkNotClosed();
+ return _messagePriority;
+ }
+
+ /**
+ * Sets the default length of time in milliseconds from its dispatch time
+ * that a produced message should be retained by the message system.
+ * <p> The JMS spec says that time to live must be set to zero by default.
+ *
+ * @param timeToLive The message time to live in milliseconds; zero is unlimited
+ * @throws JMSException If setting the default time to live fails due to some internal error.
+ */
+ public void setTimeToLive(long timeToLive) throws JMSException
+ {
+ checkNotClosed();
+ if (timeToLive < 0)
+ {
+ throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + timeToLive);
+ }
+ _timeToLive = timeToLive;
+ }
+
+ /**
+ * Gets the default length of time in milliseconds from its dispatch time
+ * that a produced message should be retained by the message system.
+ *
+ * @return The default message time to live in milliseconds; zero is unlimited
+ * @throws JMSException if getting the default time to live fails due to some internal error.
+ * @see javax.jms.MessageProducer#setTimeToLive
+ */
+ public long getTimeToLive() throws JMSException
+ {
+ checkNotClosed();
+ return _timeToLive;
+ }
+
+ /**
+ * Gets the destination associated with this producer.
+ *
+ * @return This producer's destination.
+ * @throws JMSException If getting the destination for this producer fails
+ * due to some internal error.
+ */
+ public Destination getDestination() throws JMSException
+ {
+ checkNotClosed();
+ return _destination;
+ }
+
+ /**
+ * Sends a message using the producer's default delivery mode, priority, destination
+ * and time to live.
+ *
+ * @param message the message to be sent
+ * @throws JMSException If sending the message fails due to some internal error.
+ * @throws MessageFormatException If an invalid message is specified.
+ * @throws InvalidDestinationException If this producer destination is invalid.
+ * @throws java.lang.UnsupportedOperationException
+ * If a client uses this method with a producer that did
+ * not specify a destination at creation time.
+ */
+ public void send(Message message) throws JMSException
+ {
+ send(message, _deliveryMode, _messagePriority, _timeToLive);
+ }
+
+ /**
+ * Sends a message to this producer default destination, specifying delivery mode,
+ * priority, and time to live.
+ *
+ * @param message The message to send
+ * @param deliveryMode The delivery mode to use
+ * @param priority The priority for this message
+ * @param timeToLive The message's lifetime (in milliseconds)
+ * @throws JMSException If sending the message fails due to some internal error.
+ * @throws MessageFormatException If an invalid message is specified.
+ * @throws InvalidDestinationException If this producer's destination is invalid.
+ * @throws java.lang.UnsupportedOperationException
+ * If a client uses this method with a producer that did
+ * not specify a destination at creation time.
+ */
+ public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
+ {
+ send(_destination, message, deliveryMode, priority, timeToLive);
+ }
+
+ /**
+ * Sends a message to a specified destination using this producer's default
+ * delivery mode, priority and time to live.
+ * <p/>
+ * <P>Typically, a message producer is assigned a destination at creation
+ * time; however, the JMS API also supports unidentified message producers,
+ * which require that the destination be supplied every time a message is
+ * sent.
+ *
+ * @param destination The destination to send this message to
+ * @param message The message to send
+ * @throws JMSException If sending the message fails due to some internal error.
+ * @throws MessageFormatException If an invalid message is specified.
+ * @throws InvalidDestinationException If an invalid destination is specified.
+ */
+ public void send(Destination destination, Message message) throws JMSException
+ {
+ send(destination, message, _deliveryMode, _messagePriority, _timeToLive);
+ }
+
+ /**
+ * Sends a message to a destination specifying delivery mode, priority and time to live.
+ *
+ * @param destination The destination to send this message to.
+ * @param message The message to be sent.
+ * @param deliveryMode The delivery mode to use.
+ * @param priority The priority for this message.
+ * @param timeToLive The message's lifetime (in milliseconds)
+ * @throws JMSException If sending the message fails due to some internal error.
+ * @throws MessageFormatException If an invalid message is specified.
+ * @throws InvalidDestinationException If an invalid destination is specified.
+ */
+ public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
+ throws JMSException
+ {
+ checkNotClosed();
+ getSession().checkDestination(destination);
+ // Do not allow negative timeToLive values
+ if (timeToLive < 0)
+ {
+ throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + timeToLive);
+ }
+ // Only get current time if required
+ long currentTime = Long.MIN_VALUE;
+ if (!((timeToLive == 0) && _disableTimestamps))
+ {
+ currentTime = System.currentTimeMillis();
+ }
+ // the messae UID
+ String uid = (_disableMessageId) ? "MSG_ID_DISABLED" : UUID.randomUUID().toString();
+ MessageImpl qpidMessage;
+ // check that the message is not a foreign one
+ try
+ {
+ qpidMessage = (MessageImpl) message;
+ }
+ catch (ClassCastException cce)
+ {
+ // this is a foreign message
+ qpidMessage = MessageHelper.transformMessage(message);
+ // set message's properties in case they are queried after send.
+ message.setJMSDestination(destination);
+ message.setJMSDeliveryMode(deliveryMode);
+ message.setJMSPriority(priority);
+ message.setJMSMessageID(uid);
+ if (timeToLive != 0)
+ {
+ message.setJMSExpiration(timeToLive + currentTime);
+ _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration());
+ }
+ else
+ {
+ message.setJMSExpiration(timeToLive);
+ }
+ message.setJMSTimestamp(currentTime);
+ }
+ // set the message properties
+ qpidMessage.setJMSDestination(destination);
+ qpidMessage.setJMSMessageID(uid);
+ qpidMessage.setJMSDeliveryMode(deliveryMode);
+ qpidMessage.setJMSPriority(priority);
+ if (timeToLive != 0)
+ {
+ qpidMessage.setJMSExpiration(timeToLive + currentTime);
+ }
+ else
+ {
+ qpidMessage.setJMSExpiration(timeToLive);
+ }
+ qpidMessage.setJMSTimestamp(currentTime);
+ qpidMessage.setRoutingKey(((DestinationImpl) destination).getDestinationName());
+ qpidMessage.setExchangeName(((DestinationImpl) destination).getExchangeName());
+ // call beforeMessageDispatch
+ try
+ {
+ qpidMessage.beforeMessageDispatch();
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ try
+ {
+ getSession().getQpidSession().messageTransfer(qpidMessage.getExchangeName(),
+ qpidMessage.getQpidityMessage(),
+ org.apache.qpidity.nclient.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
+ org.apache.qpidity.nclient.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE);
+ }
+ catch (IOException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageProducerImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QpidBrowserListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QpidBrowserListener.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QpidBrowserListener.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QpidBrowserListener.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,70 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.qpidity.api.Message;
+import org.apache.qpidity.nclient.util.MessageListener;
+
+/**
+ * This listener idspatches messaes to its browser.
+ */
+public class QpidBrowserListener implements MessageListener
+{
+ /**
+ * Used for debugging.
+ */
+ private static final Logger _logger = LoggerFactory.getLogger(SessionImpl.class);
+
+ /**
+ * This message listener's browser.
+ */
+ QueueBrowserImpl _browser = null;
+
+ //---- constructor
+ /**
+ * Create a message listener wrapper for a given browser
+ *
+ * @param browser The browser of this listener
+ */
+ public QpidBrowserListener(QueueBrowserImpl browser)
+ {
+ _browser = browser;
+ }
+
+ //---- org.apache.qpidity.MessagePartListener API
+ /**
+ * Deliver a message to the listener.
+ *
+ * @param message The message delivered to the listner.
+ */
+ public void onMessage(Message message)
+ {
+ try
+ {
+ //convert this message into a JMS one
+ javax.jms.Message jmsMessage = null; // todo
+ _browser.receiveMessage(jmsMessage);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QpidBrowserListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QpidExceptionListenerImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QpidExceptionListenerImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QpidExceptionListenerImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QpidExceptionListenerImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,51 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import org.apache.qpidity.QpidException;
+
+import javax.jms.JMSException;
+
+/**
+ * An exception listner
+ */
+public class QpidExceptionListenerImpl //implements ExceptionListener
+{
+ private javax.jms.ExceptionListener _jmsExceptionListener;
+
+ public QpidExceptionListenerImpl()
+ {
+ }
+
+ void setJMSExceptionListner(javax.jms.ExceptionListener jmsExceptionListener)
+ {
+ _jmsExceptionListener = jmsExceptionListener;
+ }
+ //----- ExceptionListener API
+
+ public void onException(QpidException exception)
+ {
+ // convert this exception in a JMS exception
+ JMSException jmsException = ExceptionHelper.convertQpidExceptionToJMSException(exception);
+ // propagate to the njms exception listener
+ if (_jmsExceptionListener != null)
+ {
+ _jmsExceptionListener.onException(jmsException);
+ }
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QpidExceptionListenerImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueBrowserImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueBrowserImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueBrowserImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueBrowserImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,256 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import java.util.Enumeration;
+import java.util.NoSuchElementException;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+
+import org.apache.qpidity.nclient.MessagePartListener;
+import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
+import org.apache.qpidity.filter.JMSSelectorFilter;
+import org.apache.qpidity.filter.MessageFilter;
+
+/**
+ * Implementation of the JMS QueueBrowser interface
+ */
+public class QueueBrowserImpl extends MessageActor implements QueueBrowser
+{
+ /**
+ * The browsers MessageSelector.
+ */
+ private String _messageSelector = null;
+
+ /**
+ * The message selector filter associated with this browser
+ */
+ private MessageFilter _filter = null;
+
+ /**
+ * The batch of messages to browse.
+ */
+ private Message[] _messages;
+
+ /**
+ * The number of messages read from current batch.
+ */
+ private int _browsed = 0;
+
+ /**
+ * The number of messages received from current batch.
+ */
+ private int _received = 0;
+
+ /**
+ * Indicates whether the last message has been received.
+ */
+ private int _batchLength;
+
+ /**
+ * The batch max size
+ */
+ private final int _maxbatchlength = 10;
+
+ //--- constructor
+
+ /**
+ * Create a QueueBrowser for a specific queue and a given message selector.
+ *
+ * @param session The session of this browser.
+ * @param queue The queue name for this browser
+ * @param messageSelector only messages with properties matching the message selector expression are delivered.
+ * @throws Exception In case of internal problem when creating this browser.
+ */
+ protected QueueBrowserImpl(SessionImpl session, Queue queue, String messageSelector,String consumerTag) throws Exception
+ {
+ super(session, (DestinationImpl) queue,consumerTag);
+ // this is an array representing a batch of messages for this browser.
+ _messages = new Message[_maxbatchlength];
+ if (messageSelector != null)
+ {
+ _messageSelector = messageSelector;
+ _filter = new JMSSelectorFilter(messageSelector);
+ }
+ MessagePartListener messageAssembler = new MessagePartListenerAdapter(new QpidBrowserListener(this));
+ // this is a queue we expect that this queue exists
+ getSession().getQpidSession()
+ .messageSubscribe(queue.getQueueName(), getMessageActorID(),
+ org.apache.qpidity.nclient.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
+ // We do not acquire those messages
+ org.apache.qpidity.nclient.Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE, messageAssembler, null);
+
+ }
+
+ //--- javax.njms.QueueBrowser API
+ /**
+ * Get an enumeration for browsing the current queue messages in the order they would be received.
+ *
+ * @return An enumeration for browsing the messages
+ * @throws JMSException If getting the enumeration for this browser fails due to some internal error.
+ */
+ public Enumeration getEnumeration() throws JMSException
+ {
+ requestMessages();
+ return new MessageEnumeration();
+ }
+
+
+ /**
+ * Get the queue associated with this queue browser.
+ *
+ * @return The queue associated with this queue browser.
+ * @throws JMSException If getting the queue associated with this browser failts due to some internal error.
+ */
+ public Queue getQueue() throws JMSException
+ {
+ checkNotClosed();
+ return (Queue) _destination;
+ }
+
+ /**
+ * Get this queue browser's message selector expression.
+ *
+ * @return This queue browser's message selector, or null if no message selector exists.
+ * @throws JMSException if getting the message selector for this browser fails due to some internal error.
+ */
+ public String getMessageSelector() throws JMSException
+ {
+ checkNotClosed();
+ return _messageSelector;
+ }
+
+ //-- overwritten methods.
+ /**
+ * Closes the browser and deregister it from its session.
+ *
+ * @throws JMSException if the MessaeActor cannot be closed due to some internal error.
+ */
+ public void close() throws JMSException
+ {
+ synchronized (_messages)
+ {
+ _received = 0;
+ _browsed = 0;
+ _batchLength = 0;
+ _messages.notify();
+ }
+ super.close();
+ }
+
+ //-- nonpublic methods
+ /**
+ * Request _maxbatchlength messages
+ *
+ * @throws JMSException If requesting more messages fails due to some internal error.
+ */
+ private void requestMessages() throws JMSException
+ {
+ _browsed = 0;
+ _received = 0;
+ // request messages
+ int received = 0;
+ getSession().getQpidSession()
+ .messageFlow(getMessageActorID(), org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
+ _maxbatchlength);
+ _batchLength = 0; //getSession().getQpidSession().messageFlush(getMessageActorID());
+ }
+
+ /**
+ * This method is invoked by the listener when a message is dispatched to this browser.
+ *
+ * @param m A received message
+ */
+ protected void receiveMessage(Message m)
+ {
+ synchronized (_messages)
+ {
+ _messages[_received] = m;
+ _received++;
+ _messages.notify();
+ }
+ }
+
+ //-- inner class
+ /**
+ * This is an implementation of the Enumeration interface.
+ */
+ private class MessageEnumeration implements Enumeration
+ {
+ /*
+ * Whether this enumeration has any more elements.
+ *
+ * @return True if there any more elements.
+ */
+ public boolean hasMoreElements()
+ {
+ boolean result = false;
+ // Try to work out whether there are any more messages available.
+ try
+ {
+ if (_browsed >= _maxbatchlength)
+ {
+ requestMessages();
+ }
+ synchronized (_messages)
+ {
+ while (_received == _browsed && _batchLength > _browsed)
+ {
+ // we expect more messages
+ _messages.wait();
+ }
+ if (_browsed < _received && _batchLength != _browsed)
+ {
+ result = true;
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ // If no batch could be returned, the result should be false, therefore do nothing
+ }
+ return result;
+ }
+
+ /**
+ * Get the next message element
+ *
+ * @return The next element.
+ */
+ public Object nextElement()
+ {
+ if (hasMoreElements())
+ {
+ synchronized (_messages)
+ {
+ Message message = _messages[_browsed];
+ _browsed = _browsed + 1;
+ return message;
+ }
+ }
+ else
+ {
+ throw new NoSuchElementException();
+ }
+ }
+ }
+
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueBrowserImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueConnectionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueConnectionImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueConnectionImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueConnectionImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,36 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import org.apache.qpidity.njms.ConnectionImpl;
+import org.apache.qpidity.QpidException;
+
+import javax.jms.QueueConnection;
+
+/**
+ *
+ * Implements javax.njms.QueueConnection
+ */
+public class QueueConnectionImpl extends ConnectionImpl implements QueueConnection
+{
+ //-- constructor
+ public QueueConnectionImpl(String host,int port,String virtualHost,String username,String password) throws QpidException
+ {
+ super(host, port, virtualHost, username, password);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueConnectionImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,136 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.transport.Option;
+import org.apache.qpidity.url.BindingURL;
+import org.apache.qpidity.exchange.ExchangeDefaults;
+
+import javax.jms.Queue;
+import javax.jms.JMSException;
+
+/**
+ * Implementation of the JMS Queue interface
+ */
+public class QueueImpl extends DestinationImpl implements Queue
+{
+ //--- Constructor
+ /**
+ * Create a new QueueImpl with a given name.
+ *
+ * @param name The name of this queue.
+ * @param session The session used to create this queue.
+ * @throws QpidException If the queue name is not valid
+ */
+ protected QueueImpl(SessionImpl session, String name) throws QpidException
+ {
+ super(name);
+ _queueName = name;
+ _destinationName = name;
+ _exchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+ _exchangeType = ExchangeDefaults.DIRECT_EXCHANGE_CLASS;
+ _isAutoDelete = false;
+ _isDurable = true;
+ _isExclusive = false;
+ registerQueue(session, false);
+ }
+
+ /**
+ * Create a destiantion from a binding URL
+ *
+ * @param session The session used to create this queue.
+ * @param binding The URL
+ * @throws QpidException If the URL is not valid
+ */
+ protected QueueImpl(SessionImpl session, BindingURL binding) throws QpidException
+ {
+ super(binding);
+ registerQueue(session, false);
+ }
+
+ /**
+ * Create a destiantion from a binding URL
+ *
+ * @param binding The URL
+ * @throws QpidException If the URL is not valid
+ */
+ public QueueImpl(BindingURL binding) throws QpidException
+ {
+ super(binding);
+ }
+
+ /**
+ * Create a new QueueImpl with a given name.
+ *
+ * @param name The name of this queue.
+ * @throws QpidException If the queue name is not valid
+ */
+ public QueueImpl(String name) throws QpidException
+ {
+ super(name);
+ _queueName = name;
+ _destinationName = name;
+ _exchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+ _exchangeType = ExchangeDefaults.DIRECT_EXCHANGE_CLASS;
+ _isAutoDelete = false;
+ _isDurable = true;
+ _isExclusive = false;
+ }
+
+ //---- Interface javax.njms.Queue
+ /**
+ * Gets the name of this queue.
+ *
+ * @return This queue's name.
+ */
+ public String getQueueName() throws JMSException
+ {
+ return _queueName;
+ }
+
+ //---Private methods
+ /**
+ * Check that this queue exists and declare it if required.
+ *
+ * @param session The session used to create this destination
+ * @param declare Specify whether the queue should be declared
+ * @throws QpidException If this queue does not exists on the broker.
+ */
+ protected void registerQueue(SessionImpl session, boolean declare) throws QpidException
+ {
+ // test if this exchange exist on the broker
+ //todo we can also specify if the excahnge is autodlete and durable
+ session.getQpidSession().exchangeDeclare(_exchangeName, _exchangeType, null, null, Option.PASSIVE);
+ // wait for the broker response
+ session.getQpidSession().sync();
+ // If this exchange does not exist then we will get an Expection 404 does notexist
+ //todo check for an execption
+ // now check if the queue exists
+ session.getQpidSession().queueDeclare(_queueName, null, null, _isDurable ? Option.DURABLE : Option.NO_OPTION,
+ _isAutoDelete ? Option.AUTO_DELETE : Option.NO_OPTION,
+ _isExclusive ? Option.EXCLUSIVE : Option.NO_OPTION,
+ declare ? Option.PASSIVE : Option.NO_OPTION);
+ // wait for the broker response
+ session.getQpidSession().sync();
+ // If this queue does not exist then we will get an Expection 404 does notexist
+ session.getQpidSession().queueBind(_queueName, _exchangeName, _destinationName, null);
+ // we don't have to sync as we don't expect an error
+ }
+
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueReceiverImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueReceiverImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueReceiverImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueReceiverImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,55 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import javax.jms.QueueReceiver;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+
+/**
+ * Implements javax.njms.QueueReceiver
+ */
+public class QueueReceiverImpl extends MessageConsumerImpl implements QueueReceiver
+{
+ //--- Constructor
+ /**
+ * create a new QueueReceiverImpl.
+ *
+ * @param session The session from which the QueueReceiverImpl is instantiated.
+ * @param queue The default queue for this QueueReceiverImpl.
+ * @param messageSelector the message selector for this QueueReceiverImpl.
+ * @throws Exception If the QueueReceiverImpl cannot be created due to some internal error.
+ */
+ protected QueueReceiverImpl(SessionImpl session, Queue queue, String messageSelector,String consumerTag) throws Exception
+ {
+ super(session, (DestinationImpl) queue, messageSelector, false, null,consumerTag);
+ }
+
+ //--- Interface QueueReceiver
+ /**
+ * Get the Queue associated with this queue receiver.
+ *
+ * @return this receiver's Queue
+ * @throws JMSException If getting the queue for this queue receiver fails due to some internal error.
+ */
+ public Queue getQueue() throws JMSException
+ {
+ checkNotClosed();
+ return (QueueImpl) _destination;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueReceiverImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueSenderImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueSenderImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueSenderImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueSenderImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,131 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import javax.jms.QueueSender;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.Message;
+
+/**
+ * Implements javax.njms.QueueSender
+ */
+public class QueueSenderImpl extends MessageProducerImpl implements QueueSender
+{
+ //--- Constructor
+ /**
+ * Create a new QueueSenderImpl.
+ *
+ * @param session the session from which the QueueSenderImpl is instantiated
+ * @param queue the default queue for this QueueSenderImpl
+ * @throws JMSException If the QueueSenderImpl cannot be created due to some internal error.
+ */
+ protected QueueSenderImpl(SessionImpl session, QueueImpl queue) throws JMSException
+ {
+ super(session, queue);
+ }
+
+ //--- Interface javax.njms.QueueSender
+ /**
+ * Get the queue associated with this QueueSender.
+ *
+ * @return This QueueSender's queue
+ * @throws JMSException If getting the queue for this QueueSender fails due to some internal error.
+ */
+ public Queue getQueue() throws JMSException
+ {
+ return (Queue) getDestination();
+ }
+
+ /**
+ * Sends a message to the queue. Uses the <CODE>QueueSender</CODE>'s default delivery mode, priority,
+ * and time to live.
+ *
+ * @param message The message to send.
+ * @throws JMSException if sending the message fails due to some internal error.
+ * @throws javax.jms.MessageFormatException
+ * If an invalid message is specified.
+ * @throws javax.jms.InvalidDestinationException
+ * If the queue is invalid.
+ * @throws java.lang.UnsupportedOperationException
+ * If invoked on QueueSender that did not specify a queue at creation time.
+ */
+ public void send(Message message) throws JMSException
+ {
+ super.send(message);
+ }
+
+ /**
+ * Send a message to the queue, specifying delivery mode, priority, and time to live.
+ *
+ * @param message The message to send
+ * @param deliveryMode The delivery mode to use
+ * @param priority The priority for this message
+ * @param timeToLive The message's lifetime (in milliseconds)
+ *
+ * @throws JMSException if sending the message fails due to some internal error.
+ * @throws javax.jms.MessageFormatException
+ * If an invalid message is specified.
+ * @throws javax.jms.InvalidDestinationException
+ * If the queue is invalid.
+ * @throws java.lang.UnsupportedOperationException
+ * If invoked on QueueSender that did not specify a queue at creation time.
+ */
+ public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
+ {
+ super.send(message, deliveryMode, priority, timeToLive);
+ }
+
+ /**
+ * Send a message to a queue for an unidentified message producer.
+ * Uses the <CODE>QueueSender</CODE>'s default delivery mode, priority,
+ * and time to live.
+ *
+ * @param queue The queue to send this message to
+ * @param message The message to send
+ * @throws JMSException if sending the message fails due to some internal error.
+ * @throws javax.jms.MessageFormatException
+ * If an invalid message is specified.
+ * @throws javax.jms.InvalidDestinationException
+ * If the queue is invalid.
+ */
+ public void send(Queue queue, Message message) throws JMSException
+ {
+ super.send(queue, message);
+ }
+
+ /**
+ * Sends a message to a queue for an unidentified message producer,
+ * specifying delivery mode, priority and time to live.
+ *
+ * @param queue The queue to send this message to
+ * @param message The message to send
+ * @param deliveryMode The delivery mode to use
+ * @param priority The priority for this message
+ * @param timeToLive The message's lifetime (in milliseconds)
+ * @throws JMSException if sending the message fails due to some internal error.
+ * @throws javax.jms.MessageFormatException
+ * If an invalid message is specified.
+ * @throws javax.jms.InvalidDestinationException
+ * If the queue is invalid.
+ */
+ public void send(Queue queue, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
+ {
+ super.send(queue, message, deliveryMode, priority, timeToLive);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueSenderImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueSessionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueSessionImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueSessionImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueSessionImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,154 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+
+import org.apache.qpidity.QpidException;
+
+/**
+ * Implementation of javax.njms.QueueSession
+ */
+public class QueueSessionImpl extends SessionImpl implements QueueSession
+{
+ //--- constructor
+ /**
+ * Create a JMS Session
+ *
+ * @param connection The ConnectionImpl object from which the Session is created.
+ * @param transacted Indicates if the session transacted.
+ * @param acknowledgeMode The session's acknowledgement mode. This value is ignored and set to
+ * {@link javax.jms.Session#SESSION_TRANSACTED} if the <code>transacted</code>
+ * parameter is true.
+ * @throws javax.jms.JMSSecurityException If the user could not be authenticated.
+ * @throws javax.jms.JMSException In case of internal error.
+ */
+ protected QueueSessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode) throws QpidException, JMSException
+ {
+ super(connection, transacted, acknowledgeMode,false);
+ }
+
+ //-- Overwritten methods
+ /**
+ * Creates a durable subscriber to the specified topic,
+ *
+ * @param topic The non-temporary <CODE>Topic</CODE> to subscribe to.
+ * @param name The name used to identify this subscription.
+ * @return Always throws an exception
+ * @throws IllegalStateException Always
+ */
+ @Override
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
+ {
+ throw new IllegalStateException("Cannot invoke createDurableSubscriber from QueueSession");
+ }
+
+ /**
+ * Create a TemporaryTopic.
+ *
+ * @return Always throws an exception
+ * @throws IllegalStateException Always
+ */
+ @Override
+ public TemporaryTopic createTemporaryTopic() throws JMSException
+ {
+ throw new IllegalStateException("Cannot invoke createTemporaryTopic from QueueSession");
+ }
+
+ /**
+ * Creates a topic identity given a Topicname.
+ *
+ * @param topicName The name of this <CODE>Topic</CODE>
+ * @return Always throws an exception
+ * @throws IllegalStateException Always
+ */
+ @Override
+ public Topic createTopic(String topicName) throws JMSException
+ {
+ throw new IllegalStateException("Cannot invoke createTopic from QueueSession");
+ }
+
+ /**
+ * Unsubscribes a durable subscription that has been created by a client.
+ *
+ * @param name the name used to identify this subscription
+ * @throws IllegalStateException Always
+ */
+ @Override
+ public void unsubscribe(String name) throws JMSException
+ {
+ throw new IllegalStateException("Cannot invoke unsubscribe from QueueSession");
+ }
+
+ //--- Interface javax.njms.QueueSession
+ /**
+ * Create a QueueReceiver to receive messages from the specified queue.
+ *
+ * @param queue the <CODE>Queue</CODE> to access
+ * @return A QueueReceiver
+ * @throws JMSException If creating a receiver fails due to some internal error.
+ * @throws InvalidDestinationException If an invalid queue is specified.
+ */
+ public QueueReceiver createReceiver(Queue queue) throws JMSException
+ {
+ return createReceiver(queue, null);
+ }
+
+ /**
+ * Create a QueueReceiver to receive messages from the specified queue for a given message selector.
+ *
+ * @param queue the Queue to access
+ * @param messageSelector A value of null or an empty string indicates that
+ * there is no message selector for the message consumer.
+ * @return A QueueReceiver
+ * @throws JMSException If creating a receiver fails due to some internal error.
+ * @throws InvalidDestinationException If an invalid queue is specified.
+ * @throws InvalidSelectorException If the message selector is invalid.
+ */
+ public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
+ {
+ checkNotClosed();
+ checkDestination(queue);
+ QueueReceiver receiver;
+ try
+ {
+ receiver = new QueueReceiverImpl(this, queue, messageSelector,String.valueOf(_consumerTag.incrementAndGet()));
+ }
+ catch (Exception e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ return receiver;
+ }
+
+ /**
+ * Create a QueueSender object to send messages to the specified queue.
+ *
+ * @param queue the Queue to access, or null if this is an unidentified producer
+ * @return A QueueSender
+ * @throws JMSException If creating the sender fails due to some internal error.
+ * @throws InvalidDestinationException If an invalid queue is specified.
+ */
+ public QueueSender createSender(Queue queue) throws JMSException
+ {
+ checkNotClosed();
+ // we do not check the destination since unidentified producers are allowed (no default destination).
+ return new QueueSenderImpl(this, (QueueImpl) queue);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueSessionImpl.java
------------------------------------------------------------------------------
svn:eol-style = native