You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/08/06 12:32:52 UTC
svn commit: r563097 - in /incubator/qpid/trunk/qpid/java:
client/src/main/java/org/apache/qpid/nclient/
client/src/main/java/org/apache/qpid/nclient/impl/
client/src/main/java/org/apache/qpid/nclient/jms/
client/src/main/java/org/apache/qpid/nclient/jm...
Author: arnaudsimon
Date: Mon Aug 6 03:32:50 2007
New Revision: 563097
URL: http://svn.apache.org/viewvc?view=rev&rev=563097
Log: (empty)
Added:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/AMQBindingURL.java (with props)
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/BindingURL.java (with props)
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/URLHelper.java (with props)
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/URLSyntaxException.java (with props)
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Session.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/DestinationImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageProducerImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryQueueImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryTopicImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractJMSMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Session.java?view=diff&rev=563097&r1=563096&r2=563097
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Session.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Session.java Mon Aug 6 03:32:50 2007
@@ -77,23 +77,24 @@
//------------------------------------------------------
/**
* Transfer the given message to a specified exchange.
- * <p> Following are the valid options for messageTransfer
- * <ul>
- * <li> CONFIRM
- * <li> PRE_ACCQUIRE
- * </ul>
- * <p> In the absence of a particular option, the defaul value is:
- * <ul>
- * <li> CONFIRM = false
- * <li> NO-ACCQUIRE
- * </ul>
*
- * @param exchange The exchange the message is being sent.
- * @param msg The Message to be sent
- * @param options A list of valid options
+ * @param confirmMode <ul> </li>off (0): confirmation is not required, once a message has been transferred in pre-acquire
+ * mode (or once acquire has been sent in no-acquire mode) the message is considered
+ * transferred
+ * <p/>
+ * <li> on (1): an acquired message (whether acquisition was implicit as in pre-acquire mode or
+ * explicit as in no-acquire mode) is not considered transferred until the original
+ * transfer is complete (signaled via execution.complete)
+ * </ul>
+ * @param acquireMode <ul> <li> no-acquire (0): the message must be explicitly acquired
+ * <p/>
+ * <li> pre-acquire (1): the message is acquired when the transfer starts
+ * </ul>
+ * @param exchange The exchange the message is being sent.
+ * @param msg The Message to be sent
* @throws QpidException If the session fails to send the message due to some error
*/
- public void messageTransfer(String exchange, Message msg, Option... options) throws QpidException;
+ public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode) throws QpidException;
/**
* Declare the beginning of a message transfer operation. This operation must
@@ -103,31 +104,31 @@
* <p> In the interval [messageTransfer endData] any attempt to call a method other than
* {@link Session#addMessageHeaders}, {@link Session#endData} ore {@link Session#close}
* will result in an exception being thrown.
- * <p> Following are the valid options for messageTransfer
- * <ul>
- * <li> CONFIRM
- * <li> PRE_ACCQUIRE
- * </ul>
- * <p> In the absence of a particular option, the defaul value is:
- * <ul>
- * <li> CONFIRM = false
- * <li> NO-ACCQUIRE
- * </ul>
*
- * @param exchange The exchange the message is being sent.
- * @param options Set of options.
+ * @param confirmMode <ul> </li>off (0): confirmation is not required, once a message has been transferred in pre-acquire
+ * mode (or once acquire has been sent in no-acquire mode) the message is considered
+ * transferred
+ * <p/>
+ * <li> on (1): an acquired message (whether acquisition was implicit as in pre-acquire mode or
+ * explicit as in no-acquire mode) is not considered transferred until the original
+ * transfer is complete (signaled via execution.complete)
+ * </ul>
+ * @param acquireMode <ul> <li> no-acquire (0): the message must be explicitly acquired
+ * <p/>
+ * <li> pre-acquire (1): the message is acquired when the transfer starts
+ * </ul>
+ * @param exchange The exchange the message is being sent.
* @throws QpidException If the session fails to send the message due to some error.
*/
- public void messageTransfer(String exchange, Option... options) throws QpidException;
+ public void messageTransfer(String exchange, short confirmMode, short acquireMode) throws QpidException;
/**
* Add the following headers ( {@link org.apache.qpidity.DeliveryProperties}
- * or {@link org.apache.qpidity.ApplicationProperties} ) to the message being sent.
+ * or to the message being sent.
*
* @param headers Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code>
* @throws QpidException If the session fails to execute the method due to some error
* @see org.apache.qpidity.DeliveryProperties
- * @see org.apache.qpidity.ApplicationProperties
*/
public void addMessageHeaders(Header... headers) throws QpidException;
@@ -371,7 +372,12 @@
* <p>In the absence of a particular option, the defaul value is false for each option
*
* @param queueName The name of the delcared queue.
- * @param alternateExchange Alternate excahnge.
+ * @param alternateExchange If a message is rejected by a queue, then it is sent to the alternate-exchange. A message
+ * may be rejected by a queue for the following reasons:
+ * <oL> <li> The queue is deleted when it is not empty;
+ * <<li> Immediate delivery of a message is requested, but there are no consumers connected to
+ * the queue. </ol>
+ * @param arguments Used for backward compatibility
* @param options Set of Options.
* @throws QpidException If the session fails to declare the queue due to some error.
* @see Option
@@ -385,6 +391,7 @@
* @param queueName The queue to be bound.
* @param exchangeName The exchange name.
* @param routingKey The routing key.
+ * @param arguments Used for backward compatibility
* @throws QpidException If the session fails to bind the queue due to some error.
*/
public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments)
@@ -396,6 +403,7 @@
* @param queueName The queue to be unbound.
* @param exchangeName The exchange name.
* @param routingKey The routing key.
+ * @param arguments Used for backward compatibility
* @throws QpidException If the session fails to unbind the queue due to some error.
*/
public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments)
@@ -448,9 +456,12 @@
* <p/>
* <p>In the absence of a particular option, the defaul value is false for each option</p> *
*
- * @param exchangeName The exchange name.
- * @param exchangeClass The fully qualified name of the exchange class.
- * @param options Set of options.
+ * @param exchangeName The exchange name.
+ * @param exchangeClass The fully qualified name of the exchange class.
+ * @param alternateExchange In the event that a message cannot be routed, this is the name of the exchange to which
+ * the message will be sent.
+ * @param options Set of options.
+ * @param arguments Used for backward compatibility
* @throws QpidException If the session fails to declare the exchange due to some error.
* @see Option
*/
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java?view=diff&rev=563097&r1=563096&r2=563097
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java Mon Aug 6 03:32:50 2007
@@ -13,162 +13,192 @@
public class ClientSession implements org.apache.qpid.nclient.Session
{
- Map<String,MessagePartListener> messagListeners = new HashMap<String,MessagePartListener>();
-
+ Map<String,MessagePartListener> messagListeners = new HashMap<String,MessagePartListener>();
+
+
//------------------------------------------------------
// Session housekeeping methods
//------------------------------------------------------
public void close() throws QpidException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // TODO
+
}
public void suspend() throws QpidException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // TODO
+
}
public void resume() throws QpidException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // TODO
+
}//------------------------------------------------------
// Messaging methods
// Producer
//------------------------------------------------------
- public void messageTransfer(String exchange, Message msg, Option... options) throws QpidException
+ public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode) throws QpidException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // TODO
+
}
- public void messageTransfer(String exchange, Option... options) throws QpidException
+ public void messageTransfer(String exchange, short confirmMode, short acquireMode) throws QpidException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // TODO
+
}
public void addMessageHeaders(Header... headers) throws QpidException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // TODO
+
}
public void addData(byte[] data, int off, int len) throws QpidException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // TODO
+
}
public void endData() throws QpidException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // TODO
+
}
- public void messageSubscribe(String queue, String destination, MessagePartListener listener, Map<String, ?> filter,
- Option... options) throws QpidException
+ public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode,
+ MessagePartListener listener, Map<String, ?> filter, Option... options)
+ throws QpidException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // TODO
+
}
public void messageCancel(String destination) throws QpidException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // TODO
+
}
- public void messageAcknowledge(Range... range) throws QpidException
+ public void setMessageListener(String destination, MessagePartListener listener)
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // TODO
+
}
- public void messageReject(Range... range) throws QpidException
+ public void messageFlowMode(String destination, short mode) throws QpidException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // TODO
+
}
- public Range[] messageAcquire(Range... range) throws QpidException
+ public void messageFlow(String destination, short unit, long value) throws QpidException
{
- return new Range[0]; //To change body of implemented methods use File | Settings | File Templates.
+ // TODO
+
}
- public void messageRelease(Range... range) throws QpidException
+ public boolean messageFlush(String destination) throws QpidException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // TODO
+ return false;
}
+ public void messageStop(String destination) throws QpidException
+ {
+ // TODO
- public void messageFlowMode(String destination, short mode)
+ }
+
+ public void messageAcknowledge(Range<Long>... range) throws QpidException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // TODO
+
}
- public void messageFlow(String destination, short unit, long value)
+ public void messageReject(Range<Long>... range) throws QpidException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // TODO
+
}
- public boolean messageFlush(String destination)
+ public Range<Long>[] messageAcquire(Range<Long>... range) throws QpidException
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ // TODO
+ return null;
}
- public void messageStop(String destination)
+ public void messageRelease(Range<Long>... range) throws QpidException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // TODO
+
}// -----------------------------------------------
// Local transaction methods
// ----------------------------------------------
public void txSelect() throws QpidException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // TODO
+
}
public void txCommit() throws QpidException, IllegalStateException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // TODO
+
}
public void txRollback() throws QpidException, IllegalStateException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // TODO
+
}
- public void queueDeclare(String queueName, String alternateExchange, Map<String, ?> arguments,
- Option... options) throws QpidException
+ public void queueDeclare(String queueName, String alternateExchange, Map<String, ?> arguments, Option... options)
+ throws QpidException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // TODO
+
}
- public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) throws
- QpidException
+ public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments)
+ throws QpidException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // TODO
+
}
- public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) throws
- QpidException
+ public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments)
+ throws QpidException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // TODO
+
}
public void queuePurge(String queueName) throws QpidException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // TODO
+
}
public void queueDelete(String queueName, Option... options) throws QpidException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // TODO
+
}
public void exchangeDeclare(String exchangeName, String exchangeClass, String alternateExchange,
Map<String, ?> arguments, Option... options) throws QpidException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // TODO
+
}
public void exchangeDelete(String exchangeName, Option... options) throws QpidException
{
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void setMessageListener(String destination,MessagePartListener listener)
- {
- messagListeners.put(destination, listener);
+ // TODO
+
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/DestinationImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/DestinationImpl.java?view=diff&rev=563097&r1=563096&r2=563097
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/DestinationImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/DestinationImpl.java Mon Aug 6 03:32:50 2007
@@ -17,8 +17,11 @@
*/
package org.apache.qpid.nclient.jms;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.Option;
+import org.apache.qpidity.url.BindingURL;
+
import javax.jms.Destination;
-import javax.jms.JMSException;
/**
* Implementation of the JMS Destination interface
@@ -35,24 +38,60 @@
*/
protected SessionImpl _session;
+ /**
+ * The excahnge name
+ */
+ protected String _exchangeName;
+
+ /**
+ * The excahnge class
+ */
+ protected String _exchangeClass;
+
+ /**
+ * The queu name
+ */
+ protected String _queueName;
+
//--- Constructor
/**
* Create a new DestinationImpl with a given name.
*
- * @param name The name of this destination.
+ * @param name The name of this destination.
* @param session The session used to create this destination.
- * @throws JMSException If the destiantion name is not valid
+ * @throws QpidException If the destiantion name is not valid
*/
- protected DestinationImpl(SessionImpl session, String name) throws JMSException
+ protected DestinationImpl(SessionImpl session, String name) throws QpidException
{
- // TODO validate that this destination name exists
- //_session.getQpidSession()
_session = session;
_name = name;
}
+ /**
+ * Create a destiantion from a binding URL
+ *
+ * @param session The session used to create this queue.
+ * @param binding The URL
+ * @throws QpidException If the URL is not valid
+ */
+ protected DestinationImpl(SessionImpl session, BindingURL binding) throws QpidException
+ {
+ _session = session;
+ _exchangeName = binding.getExchangeName();
+ _exchangeClass = binding.getExchangeClass();
+ _name = binding.getDestinationName();
+ // _isExclusive = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCLUSIVE));
+ boolean isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE));
+ boolean isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE));
+ _queueName = binding.getQueueName();
+ // create this exchange
+ _session.getQpidSession().exchangeDeclare(_exchangeName, _exchangeClass, null, null,
+ isDurable ? Option.DURABLE : Option.NO_OPTION,
+ isAutoDelete ? Option.AUTO_DELETE : Option.NO_OPTION);
+ }
+
//---- Getters and Setters
-
+
/**
* Gets the name of this destination.
*
@@ -84,5 +123,20 @@
return _name;
}
+ // getter methods
+ public String getQpidQueueName()
+ {
+ return _queueName;
+ }
+
+ public String getExchangeName()
+ {
+ return _exchangeName;
+ }
+
+ public String getExchangeClass()
+ {
+ return _exchangeClass;
+ }
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java?view=diff&rev=563097&r1=563096&r2=563097
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java Mon Aug 6 03:32:50 2007
@@ -17,8 +17,6 @@
*/
package org.apache.qpid.nclient.jms;
-//import org.apache.qpid.nclient.api.MessageReceiver;
-
import org.apache.qpid.nclient.jms.message.QpidMessage;
import org.apache.qpid.nclient.jms.filter.JMSSelectorFilter;
import org.apache.qpid.nclient.jms.filter.MessageFilter;
@@ -27,6 +25,7 @@
import org.apache.qpidity.Range;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.Option;
+import org.apache.qpidity.exchange.ExchangeDefaults;
import javax.jms.*;
@@ -120,15 +119,16 @@
_noLocal = noLocal;
_subscriptionName = subscriptionName;
_isStopped = getSession().isStopped();
+ // let's create a message part assembler
+ /**
+ * A Qpid message listener that pushes messages to this consumer session when this consumer is
+ * asynchronous or directly to this consumer when it is synchronously accessed.
+ */
+ MessagePartListener messageAssembler = new MessagePartListenerAdapter(new QpidMessageListener(this));
+
if (destination instanceof Queue)
{
// this is a queue we expect that this queue exists
- // let's create a message part assembler
- /**
- * A Qpid message listener that pushes messages to this consumer session when this consumer is
- * asynchronous or directly to this consumer when it is synchronously accessed.
- */
- MessagePartListener messageAssembler = new MessagePartListenerAdapter(new QpidMessageListener(this));
getSession().getQpidSession()
.messageSubscribe(destination.getName(), getMessageActorID(),
org.apache.qpid.nclient.Session.CONFIRM_MODE_NOT_REQUIRED,
@@ -144,25 +144,44 @@
{
// this is a topic we need to create a temporary queue for this consumer
// unless this is a durable subscriber
+ String queueName;
if (subscriptionName != null)
{
// this ia a durable subscriber
// create a persistent queue for this subscriber
- // getSession().getQpidSession().queueDeclare(destination.getName());
+ queueName = "topic-" + subscriptionName;
+ getSession().getQpidSession()
+ .queueDeclare(queueName, null, null, Option.EXCLUSIVE, Option.DURABLE);
}
else
{
// this is a non durable subscriber
// create a temporary queue
-
+ queueName = "topic-" + getMessageActorID();
+ getSession().getQpidSession()
+ .queueDeclare(queueName, null, null, Option.AUTO_DELETE, Option.EXCLUSIVE);
}
+ // bind this queue with the topic exchange
+ getSession().getQpidSession()
+ .queueBind(queueName, ExchangeDefaults.TOPIC_EXCHANGE_NAME, destination.getName(), null);
+ // subscribe to this topic
+ getSession().getQpidSession()
+ .messageSubscribe(queueName, getMessageActorID(),
+ org.apache.qpid.nclient.Session.CONFIRM_MODE_NOT_REQUIRED,
+ // We always acquire the messages
+ org.apache.qpid.nclient.Session.ACQUIRE_MODE_PRE_ACQUIRE, messageAssembler, null,
+ _noLocal ? Option.NO_LOCAL : Option.NO_OPTION,
+ // Request exclusive subscription access, meaning only this subscription
+ // can access the queue.
+ Option.EXCLUSIVE);
+
}
// set the flow mode
getSession().getQpidSession()
.messageFlowMode(getMessageActorID(), org.apache.qpid.nclient.Session.MESSAGE_FLOW_MODE_CREDIT);
}
- //----- Message consumer API
+ //----- Message consumer API
/**
* Gets this MessageConsumer's message selector.
*
@@ -426,7 +445,14 @@
{
messageOk = _filter.matches(message.getJMSMessage());
}
- // right now we need to acquire this message if needed
+ if (!messageOk && _preAcquire)
+ {
+ // this is the case for topics
+ // We need to ack this message
+ acknowledgeMessage(message);
+ }
+ // now we need to acquire this message if needed
+ // this is the case of queue with a message selector set
if (!_preAcquire && messageOk)
{
messageOk = acquireMessage(message);
@@ -568,5 +594,20 @@
}
}
return result;
+ }
+
+ /**
+ * Acknowledge a message
+ *
+ * @param message The message to be acknowledged
+ * @throws QpidException If the message cannot be acquired due to some internal error.
+ */
+ private void acknowledgeMessage(QpidMessage message) throws QpidException
+ {
+ if (!_preAcquire)
+ {
+ Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID());
+ getSession().getQpidSession().messageAcknowledge(range);
+ }
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageProducerImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageProducerImpl.java?view=diff&rev=563097&r1=563096&r2=563097
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageProducerImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageProducerImpl.java Mon Aug 6 03:32:50 2007
@@ -17,96 +17,300 @@
*/
package org.apache.qpid.nclient.jms;
-import javax.jms.MessageProducer;
-import javax.jms.JMSException;
-import javax.jms.Destination;
-import javax.jms.Message;
+import javax.jms.*;
/**
- * Implements MessageProducer
+ * Implements MessageProducer
*/
public class MessageProducerImpl extends MessageActor implements MessageProducer
{
+ /**
+ * If true, messages will not get a timestamp.
+ */
+ private boolean _disableTimestamps = false;
+
+ /**
+ * Priority of messages created by this producer.
+ */
+ private int _messagePriority = Message.DEFAULT_PRIORITY;
+
+ /**
+ * Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
+ */
+ private long _timeToLive;
+
+ /**
+ * Delivery mode used for this producer.
+ */
+ private int _deliveryMode = DeliveryMode.PERSISTENT;
+
+ /**
+ * Speicify whether the messageID is disable
+ */
+ private boolean _disableMessageId = false;
+ //-- constructors
public MessageProducerImpl(SessionImpl session, DestinationImpl destination)
{
super(session, destination);
}
- // Interface javax.jms.MessageProducer
-
- public void setDisableMessageID(boolean b) throws JMSException
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
+ //--- Interface javax.jms.MessageProducer
+ /**
+ * Sets whether message IDs are disabled.
+ *
+ * @param value Specify whether the MessageID must be disabled
+ * @throws JMSException If disabling messageID fails due to some internal error.
+ */
+ public void setDisableMessageID(boolean value) throws JMSException
+ {
+ checkNotClosed();
+ _disableMessageId = value;
+ }
+
+ /**
+ * Gets an indication of whether message IDs are disabled.
+ *
+ * @return true is messageID is disabled, false otherwise
+ * @throws JMSException If getting whether messagID is disabled fails due to some internal error.
+ */
public boolean getDisableMessageID() throws JMSException
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void setDisableMessageTimestamp(boolean b) throws JMSException
- {
- //To change body of implemented methods use File | Settings | File Templates.
+ checkNotClosed();
+ return _disableMessageId;
}
+ /**
+ * Sets whether message timestamps are disabled.
+ * <P> JMS spec says:
+ * <p> Since timestamps take some effort to create and increase a
+ * message's size, some JMS providers may be able to optimize message
+ * overhead if they are given a hint that the timestamp is not used by an
+ * application....
+ * these messages must have the timestamp set to zero; if the provider
+ * ignores the hint, the timestamp must be set to its normal value.
+ * <p>Message timestamps are enabled by default.
+ *
+ * @param value Indicates if message timestamps are disabled
+ * @throws JMSException if disabling the timestamps fails due to some internal error.
+ */
+ public void setDisableMessageTimestamp(boolean value) throws JMSException
+ {
+ checkNotClosed();
+ _disableTimestamps = value;
+ }
+
+ /**
+ * Gets an indication of whether message timestamps are disabled.
+ *
+ * @return an indication of whether message timestamps are disabled
+ * @throws JMSException if getting whether timestamps are disabled fails due to some internal error.
+ */
public boolean getDisableMessageTimestamp() throws JMSException
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void setDeliveryMode(int i) throws JMSException
- {
- //To change body of implemented methods use File | Settings | File Templates.
+ checkNotClosed();
+ return _disableTimestamps;
}
+ /**
+ * Sets the producer's default delivery mode.
+ * <p> JMS specification says:
+ * <p>Delivery mode is set to {@link DeliveryMode#PERSISTENT} by default.
+ *
+ * @param deliveryMode The message delivery mode for this message producer; legal
+ * values are {@link DeliveryMode#NON_PERSISTENT}
+ * and {@link DeliveryMode#PERSISTENT}.
+ * @throws JMSException if setting the delivery mode fails due to some internal error.
+ */
+ public void setDeliveryMode(int deliveryMode) throws JMSException
+ {
+ checkNotClosed();
+ if ((deliveryMode != DeliveryMode.NON_PERSISTENT) && (deliveryMode != DeliveryMode.PERSISTENT))
+ {
+ throw new JMSException(
+ "DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + deliveryMode + " is illegal");
+ }
+ _deliveryMode = deliveryMode;
+ }
+
+ /**
+ * Gets the producer's delivery mode.
+ *
+ * @return The message delivery mode for this message producer
+ * @throws JMSException If getting the delivery mode fails due to some internal error.
+ */
public int getDeliveryMode() throws JMSException
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void setPriority(int i) throws JMSException
- {
- //To change body of implemented methods use File | Settings | File Templates.
+ checkNotClosed();
+ return _deliveryMode;
}
+ /**
+ * Sets the producer's message priority.
+ * <p> The jms spec says:
+ * <p> The JMS API defines ten levels of priority value, with 0 as the
+ * lowest priority and 9 as the highest. Clients should consider priorities
+ * 0-4 as gradations of normal priority and priorities 5-9 as gradations
+ * of expedited priority.
+ * <p> Priority is set to 4 by default.
+ *
+ * @param priority The message priority for this message producer; must be a value between 0 and 9
+ * @throws JMSException if setting this producer priority fails due to some internal error.
+ */
+ public void setPriority(int priority) throws JMSException
+ {
+ checkNotClosed();
+ if ((priority < 0) || (priority > 9))
+ {
+ throw new IllegalArgumentException(
+ "Priority of " + priority + " is illegal. Value must be in range 0 to 9");
+ }
+ _messagePriority = priority;
+ }
+
+ /**
+ * Gets the producer's message priority.
+ *
+ * @return The message priority for this message producer.
+ * @throws JMSException If getting this producer message priority fails due to some internal error.
+ */
public int getPriority() throws JMSException
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void setTimeToLive(long l) throws JMSException
- {
- //To change body of implemented methods use File | Settings | File Templates.
+ checkNotClosed();
+ return _messagePriority;
}
+ /**
+ * Sets the default length of time in milliseconds from its dispatch time
+ * that a produced message should be retained by the message system.
+ * <p> The JMS spec says that time to live must be set to zero by default.
+ *
+ * @param timeToLive The message time to live in milliseconds; zero is unlimited
+ * @throws JMSException If setting the default time to live fails due to some internal error.
+ */
+ public void setTimeToLive(long timeToLive) throws JMSException
+ {
+ checkNotClosed();
+ if (timeToLive < 0)
+ {
+ throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + timeToLive);
+ }
+ _timeToLive = timeToLive;
+ }
+
+ /**
+ * Gets the default length of time in milliseconds from its dispatch time
+ * that a produced message should be retained by the message system.
+ *
+ * @return The default message time to live in milliseconds; zero is unlimited
+ * @throws JMSException if getting the default time to live fails due to some internal error.
+ * @see javax.jms.MessageProducer#setTimeToLive
+ */
public long getTimeToLive() throws JMSException
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ checkNotClosed();
+ return _timeToLive;
}
+ /**
+ * Gets the destination associated with this producer.
+ *
+ * @return This producer's destination.
+ * @throws JMSException If getting the destination for this producer fails
+ * due to some internal error.
+ */
public Destination getDestination() throws JMSException
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ checkNotClosed();
+ return _destination;
}
+ /**
+ * Sends a message using the producer's default delivery mode, priority, destination
+ * and time to live.
+ *
+ * @param message the message to be sent
+ * @throws JMSException If sending the message fails due to some internal error.
+ * @throws MessageFormatException If an invalid message is specified.
+ * @throws InvalidDestinationException If this producer destination is invalid.
+ * @throws java.lang.UnsupportedOperationException
+ * If a client uses this method with a producer that did
+ * not specify a destination at creation time.
+ */
public void send(Message message) throws JMSException
{
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void send(Message message, int i, int i1, long l) throws JMSException
- {
- //To change body of implemented methods use File | Settings | File Templates.
+ send(message, _deliveryMode, _messagePriority, _timeToLive);
}
+ /**
+ * Sends a message to this producer default destination, specifying delivery mode,
+ * priority, and time to live.
+ *
+ * @param message The message to send
+ * @param deliveryMode The delivery mode to use
+ * @param priority The priority for this message
+ * @param timeToLive The message's lifetime (in milliseconds)
+ * @throws JMSException If sending the message fails due to some internal error.
+ * @throws MessageFormatException If an invalid message is specified.
+ * @throws InvalidDestinationException If this producer's destination is invalid.
+ * @throws java.lang.UnsupportedOperationException
+ * If a client uses this method with a producer that did
+ * not specify a destination at creation time.
+ */
+ public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
+ {
+ send(_destination, message, deliveryMode, priority, timeToLive);
+ }
+
+ /**
+ * Sends a message to a specified destination using this producer's default
+ * delivery mode, priority and time to live.
+ * <p/>
+ * <P>Typically, a message producer is assigned a destination at creation
+ * time; however, the JMS API also supports unidentified message producers,
+ * which require that the destination be supplied every time a message is
+ * sent.
+ *
+ * @param destination The destination to send this message to
+ * @param message The message to send
+ * @throws JMSException If sending the message fails due to some internal error.
+ * @throws MessageFormatException If an invalid message is specified.
+ * @throws InvalidDestinationException If an invalid destination is specified.
+ */
public void send(Destination destination, Message message) throws JMSException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ send(destination, message, _deliveryMode, _messagePriority, _timeToLive);
}
- public void send(Destination destination, Message message, int i, int i1, long l) throws JMSException
- {
- //To change body of implemented methods use File | Settings | File Templates.
+ /**
+ * Sends a message to a destination specifying delivery mode, priority and time to live.
+ *
+ * @param destination The destination to send this message to.
+ * @param message The message to be sent.
+ * @param deliveryMode The delivery mode to use.
+ * @param priority The priority for this message.
+ * @param timeToLive The message's lifetime (in milliseconds)
+ * @throws JMSException If sending the message fails due to some internal error.
+ * @throws MessageFormatException If an invalid message is specified.
+ * @throws InvalidDestinationException If an invalid destination is specified.
+ */
+ public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
+ throws JMSException
+ {
+ checkNotClosed();
+ getSession().checkDestination(destination);
+ // Do not allow negative timeToLive values
+ if (timeToLive < 0)
+ {
+ throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + timeToLive);
+ }
+ // check that the message is not a foreign one
+
+ // set the properties
+
+ //
+
+ // dispatch it
+ // todo getSession().getQpidSession().messageTransfer(((DestinationImpl) destination).getExchangeName(), message, Option);
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueImpl.java?view=diff&rev=563097&r1=563096&r2=563097
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueImpl.java Mon Aug 6 03:32:50 2007
@@ -17,6 +17,11 @@
*/
package org.apache.qpid.nclient.jms;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.Option;
+import org.apache.qpidity.url.BindingURL;
+import org.apache.qpidity.exchange.ExchangeDefaults;
+
import javax.jms.Queue;
import javax.jms.JMSException;
@@ -32,15 +37,32 @@
*
* @param name The name of this queue.
* @param session The session used to create this queue.
- * @throws JMSException If the queue name is not valid
+ * @throws QpidException If the queue name is not valid
*/
- protected QueueImpl(SessionImpl session, String name) throws JMSException
+ protected QueueImpl(SessionImpl session, String name) throws QpidException
{
super(session, name);
+ _exchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+ _exchangeClass = ExchangeDefaults.FANOUT_EXCHANGE_CLASS;
+ _queueName = name;
+ // check that this queue exist on the server
+ // As pasive is set the server will not create the queue.
+ session.getQpidSession().queueDeclare(name, null, null, Option.PASSIVE);
}
- //---- Interface javax.jms.Queue
+ /**
+ * Create a destiantion from a binding URL
+ *
+ * @param session The session used to create this queue.
+ * @param binding The URL
+ * @throws QpidException If the URL is not valid
+ */
+ protected QueueImpl(SessionImpl session, BindingURL binding) throws QpidException
+ {
+ super(session, binding);
+ }
+ //---- Interface javax.jms.Queue
/**
* Gets the name of this queue.
*
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java?view=diff&rev=563097&r1=563096&r2=563097
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java Mon Aug 6 03:32:50 2007
@@ -569,7 +569,7 @@
{
checkNotClosed();
checkDestination(destination);
- MessageConsumerImpl consumer = null;
+ MessageConsumerImpl consumer;
try
{
consumer = new MessageConsumerImpl(this, (DestinationImpl) destination, messageSelector, noLocal, null);
@@ -602,7 +602,16 @@
public Queue createQueue(String queueName) throws JMSException
{
checkNotClosed();
- return new QueueImpl(this, queueName);
+ Queue result;
+ try
+ {
+ result = new QueueImpl(this, queueName);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ return result;
}
/**
@@ -624,7 +633,16 @@
public Topic createTopic(String topicName) throws JMSException
{
checkNotClosed();
- return new TopicImpl(this, topicName);
+ Topic result;
+ try
+ {
+ result = new TopicImpl(this, topicName);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ return result;
}
/**
@@ -713,25 +731,43 @@
}
/**
- * Create a TemporaryQueue. Its lifetime will be tha of the Connection unless it is deleted earlier.
+ * Create a TemporaryQueue. Its lifetime will be the Connection unless it is deleted earlier.
*
* @return A temporary queue.
* @throws JMSException If creating the temporary queue fails due to some internal error.
*/
public TemporaryQueue createTemporaryQueue() throws JMSException
{
- return new TemporaryQueueImpl(this);
+ TemporaryQueue result;
+ try
+ {
+ result = new TemporaryQueueImpl(this);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ return result;
}
/**
- * Create a TemporaryTopic. Its lifetime will be tha of the Connection unless it is deleted earlier.
+ * Create a TemporaryTopic. Its lifetime will be the Connection unless it is deleted earlier.
*
* @return A temporary topic.
* @throws JMSException If creating the temporary topic fails due to some internal error.
*/
public TemporaryTopic createTemporaryTopic() throws JMSException
{
- return new TemporaryTopicImpl(this);
+ TemporaryTopic result;
+ try
+ {
+ result = new TemporaryTopicImpl(this);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ return result;
}
/**
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryQueueImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryQueueImpl.java?view=diff&rev=563097&r1=563096&r2=563097
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryQueueImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryQueueImpl.java Mon Aug 6 03:32:50 2007
@@ -17,13 +17,18 @@
*/
package org.apache.qpid.nclient.jms;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.Option;
+import org.apache.qpidity.exchange.ExchangeDefaults;
+
import javax.jms.TemporaryQueue;
import javax.jms.JMSException;
+import java.util.UUID;
/**
* Implements TemporaryQueue
*/
-public class TemporaryQueueImpl extends QueueImpl implements TemporaryQueue, TemporaryDestination
+public class TemporaryQueueImpl extends DestinationImpl implements TemporaryQueue, TemporaryDestination
{
/**
* Indicates whether this temporary queue is deleted.
@@ -32,16 +37,23 @@
//--- constructor
- /**
+ /**
* Create a new TemporaryQueueImpl with a given name.
*
* @param session The session used to create this TemporaryQueueImpl.
- * @throws JMSException If creating the TemporaryQueueImpl fails due to some error.
+ * @throws QpidException If creating the TemporaryQueueImpl fails due to some error.
*/
- public TemporaryQueueImpl(SessionImpl session) throws JMSException
+ protected TemporaryQueueImpl(SessionImpl session) throws QpidException
{
- // temporary destinations do not have names and are not registered in the JNDI namespace.
+ // temporary destinations do not have names
super(session, "NAME_NOT_SET");
+ _exchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+ _exchangeClass = ExchangeDefaults.FANOUT_EXCHANGE_CLASS;
+ _queueName = "TempQueue-" + UUID.randomUUID();
+ // check that this queue exist on the server
+ // As pasive is set the server will not create the queue.
+ session.getQpidSession().queueDeclare(_queueName, null, null, Option.AUTO_DELETE);
+ session.getQpidSession().queueBind(_queueName, _exchangeName, _queueName, null);
}
//-- TemporaryDestination Interface
@@ -59,11 +71,22 @@
/**
* Delete this temporary destinaiton
*
- * @throws JMSException If deleting this temporary queue fails due to some error.
+ * @throws JMSException If deleting this temporary queue fails due to some error.
*/
public void delete() throws JMSException
{
// todo delete this temporary queue
_isDeleted = true;
+ }
+
+ //---- Interface javax.jms.Queue
+ /**
+ * Gets the name of this queue.
+ *
+ * @return This queue's name.
+ */
+ public String getQueueName() throws JMSException
+ {
+ return super.getName();
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryTopicImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryTopicImpl.java?view=diff&rev=563097&r1=563096&r2=563097
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryTopicImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryTopicImpl.java Mon Aug 6 03:32:50 2007
@@ -17,6 +17,8 @@
*/
package org.apache.qpid.nclient.jms;
+import org.apache.qpidity.QpidException;
+
import javax.jms.TemporaryTopic;
import javax.jms.JMSException;
@@ -36,9 +38,9 @@
* Create a new TemporaryTopicImpl with a given name.
*
* @param session The session used to create this TemporaryTopicImpl.
- * @throws JMSException If creating the TemporaryTopicImpl fails due to some error.
+ * @throws QpidException If creating the TemporaryTopicImpl fails due to some error.
*/
- public TemporaryTopicImpl(SessionImpl session) throws JMSException
+ protected TemporaryTopicImpl(SessionImpl session) throws QpidException
{
// temporary destinations do not have names.
super(session, "NAME_NOT_SET");
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicImpl.java?view=diff&rev=563097&r1=563096&r2=563097
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicImpl.java Mon Aug 6 03:32:50 2007
@@ -17,8 +17,11 @@
*/
package org.apache.qpid.nclient.jms;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.exchange.ExchangeDefaults;
+import org.apache.qpidity.url.BindingURL;
+
import javax.jms.Topic;
-import javax.jms.JMSException;
/**
* Implementation of the javax.jms.Topic interface.
@@ -29,16 +32,30 @@
/**
* Create a new TopicImpl with a given name.
*
- * @param name The name of this topic
+ * @param name The name of this topic
* @param session The session used to create this queue.
- * @throws JMSException If the topic name is not valid
+ * @throws QpidException If the topic name is not valid
*/
- public TopicImpl(SessionImpl session, String name) throws JMSException
+ public TopicImpl(SessionImpl session, String name) throws QpidException
{
super(session, name);
+ _exchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
+ _exchangeClass = ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
+ }
+
+ /**
+ * Create a TopicImpl from a binding URL
+ *
+ * @param session The session used to create this Topic.
+ * @param binding The URL
+ * @throws QpidException If the URL is not valid
+ */
+ protected TopicImpl(SessionImpl session, BindingURL binding) throws QpidException
+ {
+ super(session, binding);
}
- //--- javax.jsm.Topic Interface
+ //--- javax.jsm.Topic Interface
/**
* Gets the name of this topic.
*
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractJMSMessage.java?view=diff&rev=563097&r1=563096&r2=563097
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractJMSMessage.java Mon Aug 6 03:32:50 2007
@@ -33,17 +33,14 @@
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.url.URLSyntaxException;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageNotReadableException;
-import javax.jms.MessageNotWriteableException;
+import javax.jms.*;
import java.util.Collections;
import java.util.Enumeration;
import java.util.Map;
import java.util.UUID;
-public abstract class AbstractJMSMessage extends QpidMessage implements org.apache.qpid.jms.Message
+public abstract class AbstractJMSMessage extends QpidMessage implements Message
{
private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap());
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java?view=diff&rev=563097&r1=563096&r2=563097
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java Mon Aug 6 03:32:50 2007
@@ -145,6 +145,7 @@
//todo
return new Long(1);
}
+
}
Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/AMQBindingURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/AMQBindingURL.java?view=auto&rev=563097
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/AMQBindingURL.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/AMQBindingURL.java Mon Aug 6 03:32:50 2007
@@ -0,0 +1,261 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.url;
+
+import org.apache.qpidity.exchange.ExchangeDefaults;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+import java.util.HashMap;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+public class AMQBindingURL implements BindingURL
+{
+ private static final Logger _logger = LoggerFactory.getLogger(AMQBindingURL.class);
+
+ String _url;
+ String _exchangeClass;
+ String _exchangeName;
+ String _destinationName;
+ String _queueName;
+ private HashMap<String, String> _options;
+
+ public AMQBindingURL(String url) throws URLSyntaxException
+ {
+ // format:
+ // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Parsing URL: " + url);
+ }
+ _url = url;
+ _options = new HashMap<String, String>();
+ parseBindingURL();
+ }
+
+ private void parseBindingURL() throws URLSyntaxException
+ {
+ try
+ {
+ URI connection = new URI(_url);
+ String exchangeClass = connection.getScheme();
+ if (exchangeClass == null)
+ {
+ _url = ExchangeDefaults.DIRECT_EXCHANGE_CLASS + "://" + "" + "//" + _url;
+ // URLHelper.parseError(-1, "Exchange Class not specified.", _url);
+ parseBindingURL();
+ return;
+ }
+ else
+ {
+ setExchangeClass(exchangeClass);
+ }
+ String exchangeName = connection.getHost();
+ if (exchangeName == null)
+ {
+ if (getExchangeClass().equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
+ {
+ setExchangeName("");
+ }
+ else
+ {
+ throw URLHelper.parseError(-1, "Exchange Name not specified.", _url);
+ }
+ }
+ else
+ {
+ setExchangeName(exchangeName);
+ }
+ String queueName;
+ if ((connection.getPath() == null) || connection.getPath().equals(""))
+ {
+ throw URLHelper.parseError(_url.indexOf(_exchangeName) + _exchangeName.length(),
+ "Destination or Queue requried", _url);
+ }
+ else
+ {
+ int slash = connection.getPath().indexOf("/", 1);
+ if (slash == -1)
+ {
+ throw URLHelper.parseError(_url.indexOf(_exchangeName) + _exchangeName.length(),
+ "Destination requried", _url);
+ }
+ else
+ {
+ String path = connection.getPath();
+ setDestinationName(path.substring(1, slash));
+
+ // We don't set queueName yet as the actual value we use depends on options set
+ // when we are dealing with durable subscriptions
+
+ queueName = path.substring(slash + 1);
+
+ }
+ }
+
+ URLHelper.parseOptions(_options, connection.getQuery());
+ processOptions();
+ // We can now call setQueueName as the URL is full parsed.
+ setQueueName(queueName);
+ // Fragment is #string (not used)
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("URL Parsed: " + this);
+ }
+ }
+ catch (URISyntaxException uris)
+ {
+ throw URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
+ }
+ }
+
+
+ private void processOptions()
+ {
+ // this is where we would parse any options that needed more than just storage.
+ }
+
+ public String getURL()
+ {
+ return _url;
+ }
+
+ public String getExchangeClass()
+ {
+ return _exchangeClass;
+ }
+
+ private void setExchangeClass(String exchangeClass)
+ {
+
+ _exchangeClass = exchangeClass;
+ if (exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
+ {
+ setOption(BindingURL.OPTION_EXCLUSIVE, "true");
+ }
+
+ }
+
+ public String getExchangeName()
+ {
+ return _exchangeName;
+ }
+
+ private void setExchangeName(String name)
+ {
+ _exchangeName = name;
+ }
+
+ public String getDestinationName()
+ {
+ return _destinationName;
+ }
+
+ private void setDestinationName(String name)
+ {
+ _destinationName = name;
+ }
+
+ public String getQueueName()
+ {
+ return _queueName;
+ }
+
+ public void setQueueName(String name) throws URLSyntaxException
+ {
+ if (_exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
+ {
+ if (Boolean.parseBoolean(getOption(OPTION_DURABLE)))
+ {
+ if (containsOption(BindingURL.OPTION_CLIENTID) && containsOption(BindingURL.OPTION_SUBSCRIPTION))
+ {
+ _queueName = getOption(BindingURL.OPTION_CLIENTID + ":" + BindingURL.OPTION_SUBSCRIPTION);
+ }
+ else
+ {
+ throw URLHelper.parseError(-1,
+ "Durable subscription must have values for " + BindingURL.OPTION_CLIENTID + " and " + BindingURL.OPTION_SUBSCRIPTION + ".",
+ _url);
+
+ }
+ }
+ else
+ {
+ _queueName = null;
+ }
+ }
+ else
+ {
+ _queueName = name;
+ }
+
+ }
+
+ public String getOption(String key)
+ {
+ return _options.get(key);
+ }
+
+ public void setOption(String key, String value)
+ {
+ _options.put(key, value);
+ }
+
+ public boolean containsOption(String key)
+ {
+ return _options.containsKey(key);
+ }
+
+ public String getRoutingKey()
+ {
+ if (_exchangeClass.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
+ {
+ return getQueueName();
+ }
+
+ if (containsOption(BindingURL.OPTION_ROUTING_KEY))
+ {
+ return getOption(OPTION_ROUTING_KEY);
+ }
+
+ return getDestinationName();
+ }
+
+ public void setRoutingKey(String key)
+ {
+ setOption(OPTION_ROUTING_KEY, key);
+ }
+
+ public String toString()
+ {
+ StringBuffer sb = new StringBuffer();
+
+ sb.append(_exchangeClass);
+ sb.append("://");
+ sb.append(_exchangeName);
+ sb.append('/');
+ sb.append(_destinationName);
+ sb.append('/');
+ sb.append(_queueName);
+
+ sb.append(URLHelper.printOptions(_options));
+
+ return sb.toString();
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/AMQBindingURL.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/BindingURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/BindingURL.java?view=auto&rev=563097
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/BindingURL.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/BindingURL.java Mon Aug 6 03:32:50 2007
@@ -0,0 +1,53 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.url;
+
+import org.apache.qpid.framing.AMQShortString;
+
+/*
+ Binding URL format:
+ <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
+*/
+public interface BindingURL
+{
+ public static final String OPTION_EXCLUSIVE = "exclusive";
+ public static final String OPTION_AUTODELETE = "autodelete";
+ public static final String OPTION_DURABLE = "durable";
+ public static final String OPTION_CLIENTID = "clientid";
+ public static final String OPTION_SUBSCRIPTION = "subscription";
+ public static final String OPTION_ROUTING_KEY = "routingkey";
+
+
+ String getURL();
+
+ String getExchangeClass();
+
+ String getExchangeName();
+
+ String getDestinationName();
+
+ String getQueueName();
+
+ String getOption(String key);
+
+ boolean containsOption(String key);
+
+ String getRoutingKey();
+
+ String toString();
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/BindingURL.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/URLHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/URLHelper.java?view=auto&rev=563097
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/URLHelper.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/URLHelper.java Mon Aug 6 03:32:50 2007
@@ -0,0 +1,169 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.url;
+
+import java.util.HashMap;
+
+public class URLHelper
+{
+ public static char DEFAULT_OPTION_SEPERATOR = '&';
+ public static char ALTERNATIVE_OPTION_SEPARATOR = ',';
+ public static char BROKER_SEPARATOR = ';';
+
+ public static void parseOptions(HashMap<String, String> optionMap, String options) throws URLSyntaxException
+ {
+ // options looks like this
+ // brokerlist='tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'',failover='method?option='value',option='value''
+
+ if ((options == null) || (options.indexOf('=') == -1))
+ {
+ return;
+ }
+
+ int optionIndex = options.indexOf('=');
+
+ String option = options.substring(0, optionIndex);
+
+ int length = options.length();
+
+ int nestedQuotes = 0;
+
+ // to store index of final "'"
+ int valueIndex = optionIndex;
+
+ // Walk remainder of url.
+ while ((nestedQuotes > 0) || (valueIndex < length))
+ {
+ valueIndex++;
+
+ if (valueIndex >= length)
+ {
+ break;
+ }
+
+ if (options.charAt(valueIndex) == '\'')
+ {
+ if ((valueIndex + 1) < options.length())
+ {
+ if ((options.charAt(valueIndex + 1) == DEFAULT_OPTION_SEPERATOR)
+ || (options.charAt(valueIndex + 1) == ALTERNATIVE_OPTION_SEPARATOR)
+ || (options.charAt(valueIndex + 1) == BROKER_SEPARATOR)
+ || (options.charAt(valueIndex + 1) == '\''))
+ {
+ nestedQuotes--;
+
+ if (nestedQuotes == 0)
+ {
+ // We've found the value of an option
+ break;
+ }
+ }
+ else
+ {
+ nestedQuotes++;
+ }
+ }
+ else
+ {
+ // We are at the end of the string
+ // Check to see if we are corectly closing quotes
+ if (options.charAt(valueIndex) == '\'')
+ {
+ nestedQuotes--;
+ }
+
+ break;
+ }
+ }
+ }
+
+ if ((nestedQuotes != 0) || (valueIndex < (optionIndex + 2)))
+ {
+ int sepIndex = 0;
+
+ // Try and identify illegal separator character
+ if (nestedQuotes > 1)
+ {
+ for (int i = 0; i < nestedQuotes; i++)
+ {
+ sepIndex = options.indexOf('\'', sepIndex);
+ sepIndex++;
+ }
+ }
+
+ if ((sepIndex >= options.length()) || (sepIndex == 0))
+ {
+ throw parseError(valueIndex, "Unterminated option", options);
+ }
+ else
+ {
+ throw parseError(sepIndex, "Unterminated option. Possible illegal option separator:'"
+ + options.charAt(sepIndex) + "'", options);
+ }
+ }
+
+ // optionIndex +2 to skip "='"
+ String value = options.substring(optionIndex + 2, valueIndex);
+
+ optionMap.put(option, value);
+
+ if (valueIndex < (options.length() - 1))
+ {
+ // Recurse to get remaining options
+ parseOptions(optionMap, options.substring(valueIndex + 2));
+ }
+ }
+
+ public static URLSyntaxException parseError(int index, String error, String url)
+ {
+ return parseError(index, 1, error, url);
+ }
+
+ public static URLSyntaxException parseError(int index, int length, String error, String url)
+ {
+ return new URLSyntaxException(url, error, index, length);
+ }
+
+ public static String printOptions(HashMap<String, String> options)
+ {
+ if (options.isEmpty())
+ {
+ return "";
+ }
+ else
+ {
+ StringBuffer sb = new StringBuffer();
+ sb.append('?');
+ for (String key : options.keySet())
+ {
+ sb.append(key);
+
+ sb.append("='");
+
+ sb.append(options.get(key));
+
+ sb.append("'");
+ sb.append(DEFAULT_OPTION_SEPERATOR);
+ }
+
+ sb.deleteCharAt(sb.length() - 1);
+
+ return sb.toString();
+ }
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/URLHelper.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/URLSyntaxException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/URLSyntaxException.java?view=auto&rev=563097
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/URLSyntaxException.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/URLSyntaxException.java Mon Aug 6 03:32:50 2007
@@ -0,0 +1,94 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.url;
+
+import java.net.URISyntaxException;
+
+public class URLSyntaxException extends URISyntaxException
+{
+ private int _length;
+
+ public URLSyntaxException(String url, String error, int index, int length)
+ {
+ super(url, error, index);
+
+ _length = length;
+ }
+
+ private static String getPositionString(int index, int length)
+ {
+ StringBuffer sb = new StringBuffer(index + 1);
+
+ for (int i = 0; i < index; i++)
+ {
+ sb.append(" ");
+ }
+
+ if (length > -1)
+ {
+ for (int i = 0; i < length; i++)
+ {
+ sb.append('^');
+ }
+ }
+
+ return sb.toString();
+ }
+
+
+ public String toString()
+ {
+ StringBuffer sb = new StringBuffer();
+
+ sb.append(getReason());
+
+ if (getIndex() > -1)
+ {
+ if (_length != -1)
+ {
+ sb.append(" between indicies ");
+ sb.append(getIndex());
+ sb.append(" and ");
+ sb.append(_length);
+ }
+ else
+ {
+ sb.append(" at index ");
+ sb.append(getIndex());
+ }
+ }
+
+ sb.append(" ");
+ if (getIndex() != -1)
+ {
+ sb.append("\n");
+ }
+
+ sb.append(getInput());
+
+ if (getIndex() != -1)
+ {
+ sb.append("\n");
+ sb.append(getPositionString(getIndex(), _length));
+ }
+
+ return sb.toString();
+ }
+
+
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/URLSyntaxException.java
------------------------------------------------------------------------------
svn:eol-style = native