You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/08/06 12:32:52 UTC

svn commit: r563097 - in /incubator/qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/nclient/ client/src/main/java/org/apache/qpid/nclient/impl/ client/src/main/java/org/apache/qpid/nclient/jms/ client/src/main/java/org/apache/qpid/nclient/jm...

Author: arnaudsimon
Date: Mon Aug  6 03:32:50 2007
New Revision: 563097

URL: http://svn.apache.org/viewvc?view=rev&rev=563097
Log: (empty)

Added:
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/AMQBindingURL.java   (with props)
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/BindingURL.java   (with props)
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/URLHelper.java   (with props)
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/URLSyntaxException.java   (with props)
Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Session.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/DestinationImpl.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageProducerImpl.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueImpl.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryQueueImpl.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryTopicImpl.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicImpl.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractJMSMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Session.java?view=diff&rev=563097&r1=563096&r2=563097
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Session.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Session.java Mon Aug  6 03:32:50 2007
@@ -77,23 +77,24 @@
     //------------------------------------------------------
     /**
      * Transfer the given message to a specified exchange.
-     * <p> Following are the valid options for messageTransfer
-     * <ul>
-     * <li> CONFIRM
-     * <li> PRE_ACCQUIRE
-     * </ul>
-     * <p> In the absence of a particular option, the defaul value is:
-     * <ul>
-     * <li> CONFIRM = false
-     * <li> NO-ACCQUIRE
-     * </ul>
      *
-     * @param exchange The exchange the message is being sent.
-     * @param msg      The Message to be sent
-     * @param options  A list of valid options
+     * @param confirmMode <ul> </li>off (0): confirmation is not required, once a message has been transferred in pre-acquire
+     *                    mode (or once acquire has been sent in no-acquire mode) the message is considered
+     *                    transferred
+     *                    <p/>
+     *                    <li> on  (1): an acquired message (whether acquisition was implicit as in pre-acquire mode or
+     *                    explicit as in no-acquire mode) is not considered transferred until the original
+     *                    transfer is complete (signaled via execution.complete)
+     *                    </ul>
+     * @param acquireMode <ul> <li> no-acquire  (0): the message must be explicitly acquired
+     *                    <p/>
+     *                    <li> pre-acquire (1): the message is acquired when the transfer starts
+     *                    </ul>
+     * @param exchange    The exchange the message is being sent.
+     * @param msg         The Message to be sent
      * @throws QpidException If the session fails to send the message due to some error
      */
-    public void messageTransfer(String exchange, Message msg, Option... options) throws QpidException;
+    public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode) throws QpidException;
 
     /**
      * Declare the beginning of a message transfer operation. This operation must
@@ -103,31 +104,31 @@
      * <p> In the interval [messageTransfer endData] any attempt to call a method other than
      * {@link Session#addMessageHeaders}, {@link Session#endData} ore {@link Session#close}
      * will result in an exception being thrown.
-     * <p> Following are the valid options for messageTransfer
-     * <ul>
-     * <li> CONFIRM
-     * <li> PRE_ACCQUIRE
-     * </ul>
-     * <p> In the absence of a particular option, the defaul value is:
-     * <ul>
-     * <li> CONFIRM = false
-     * <li> NO-ACCQUIRE
-     * </ul>
      *
-     * @param exchange The exchange the message is being sent.
-     * @param options  Set of options.
+     * @param confirmMode <ul> </li>off (0): confirmation is not required, once a message has been transferred in pre-acquire
+     *                    mode (or once acquire has been sent in no-acquire mode) the message is considered
+     *                    transferred
+     *                    <p/>
+     *                    <li> on  (1): an acquired message (whether acquisition was implicit as in pre-acquire mode or
+     *                    explicit as in no-acquire mode) is not considered transferred until the original
+     *                    transfer is complete (signaled via execution.complete)
+     *                    </ul>
+     * @param acquireMode <ul> <li> no-acquire  (0): the message must be explicitly acquired
+     *                    <p/>
+     *                    <li> pre-acquire (1): the message is acquired when the transfer starts
+     *                    </ul>
+     * @param exchange    The exchange the message is being sent.
      * @throws QpidException If the session fails to send the message due to some error.
      */
-    public void messageTransfer(String exchange, Option... options) throws QpidException;
+    public void messageTransfer(String exchange, short confirmMode, short acquireMode) throws QpidException;
 
     /**
      * Add the following headers ( {@link org.apache.qpidity.DeliveryProperties}
-     * or {@link org.apache.qpidity.ApplicationProperties} ) to the message being sent.
+     * or to the message being sent.
      *
      * @param headers Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code>
      * @throws QpidException If the session fails to execute the method due to some error
      * @see org.apache.qpidity.DeliveryProperties
-     * @see org.apache.qpidity.ApplicationProperties
      */
     public void addMessageHeaders(Header... headers) throws QpidException;
 
@@ -371,7 +372,12 @@
      * <p>In the absence of a particular option, the defaul value is false for each option
      *
      * @param queueName         The name of the delcared queue.
-     * @param alternateExchange Alternate excahnge.
+     * @param alternateExchange If a message is rejected by a queue, then it is sent to the alternate-exchange. A message
+     *                          may be rejected by a queue for the following reasons:
+     *                          <oL> <li> The queue is deleted when it is not empty;
+     *                          <<li> Immediate delivery of a message is requested, but there are no consumers connected to
+     *                          the queue. </ol>
+     * @param arguments         Used for backward compatibility
      * @param options           Set of Options.
      * @throws QpidException If the session fails to declare the queue due to some error.
      * @see Option
@@ -385,6 +391,7 @@
      * @param queueName    The queue to be bound.
      * @param exchangeName The exchange name.
      * @param routingKey   The routing key.
+     * @param arguments    Used for backward compatibility
      * @throws QpidException If the session fails to bind the queue due to some error.
      */
     public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments)
@@ -396,6 +403,7 @@
      * @param queueName    The queue to be unbound.
      * @param exchangeName The exchange name.
      * @param routingKey   The routing key.
+     * @param arguments    Used for backward compatibility
      * @throws QpidException If the session fails to unbind the queue due to some error.
      */
     public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments)
@@ -448,9 +456,12 @@
      * <p/>
      * <p>In the absence of a particular option, the defaul value is false for each option</p>     *
      *
-     * @param exchangeName  The exchange name.
-     * @param exchangeClass The fully qualified name of the exchange class.
-     * @param options       Set of options.
+     * @param exchangeName      The exchange name.
+     * @param exchangeClass     The fully qualified name of the exchange class.
+     * @param alternateExchange In the event that a message cannot be routed, this is the name of the exchange to which
+     *                          the message will be sent.
+     * @param options           Set of options.
+     * @param arguments         Used for backward compatibility
      * @throws QpidException If the session fails to declare the exchange due to some error.
      * @see Option
      */

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java?view=diff&rev=563097&r1=563096&r2=563097
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java Mon Aug  6 03:32:50 2007
@@ -13,162 +13,192 @@
 public class ClientSession implements org.apache.qpid.nclient.Session
 {
 
-	Map<String,MessagePartListener> messagListeners = new HashMap<String,MessagePartListener>();    
-	
+	Map<String,MessagePartListener> messagListeners = new HashMap<String,MessagePartListener>();
+
+
     //------------------------------------------------------
     //                 Session housekeeping methods
     //------------------------------------------------------
     public void close() throws QpidException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // TODO
+
     }
 
     public void suspend() throws QpidException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // TODO
+
     }
 
     public void resume() throws QpidException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // TODO
+
     }//------------------------------------------------------
     //                 Messaging methods
     //                   Producer
     //------------------------------------------------------
-    public void messageTransfer(String exchange, Message msg, Option... options) throws QpidException
+    public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode) throws QpidException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // TODO
+
     }
 
-    public void messageTransfer(String exchange, Option... options) throws QpidException
+    public void messageTransfer(String exchange, short confirmMode, short acquireMode) throws QpidException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // TODO
+
     }
 
     public void addMessageHeaders(Header... headers) throws QpidException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // TODO
+
     }
 
     public void addData(byte[] data, int off, int len) throws QpidException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // TODO
+
     }
 
     public void endData() throws QpidException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // TODO
+
     }
 
-    public void messageSubscribe(String queue, String destination, MessagePartListener listener, Map<String, ?> filter,
-                                 Option... options) throws QpidException
+    public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode,
+                                 MessagePartListener listener, Map<String, ?> filter, Option... options)
+            throws QpidException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // TODO
+
     }
 
     public void messageCancel(String destination) throws QpidException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // TODO
+
     }
 
-    public void messageAcknowledge(Range... range) throws QpidException
+    public void setMessageListener(String destination, MessagePartListener listener)
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // TODO
+
     }
 
-    public void messageReject(Range... range) throws QpidException
+    public void messageFlowMode(String destination, short mode) throws QpidException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // TODO
+
     }
 
-    public Range[] messageAcquire(Range... range) throws QpidException
+    public void messageFlow(String destination, short unit, long value) throws QpidException
     {
-        return new Range[0];  //To change body of implemented methods use File | Settings | File Templates.
+        // TODO
+
     }
 
-    public void messageRelease(Range... range) throws QpidException
+    public boolean messageFlush(String destination) throws QpidException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // TODO
+        return false;
     }
 
+    public void messageStop(String destination) throws QpidException
+    {
+        // TODO
 
-    public void messageFlowMode(String destination, short mode)
+    }
+
+    public void messageAcknowledge(Range<Long>... range) throws QpidException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // TODO
+
     }
 
-    public void messageFlow(String destination, short unit, long value)
+    public void messageReject(Range<Long>... range) throws QpidException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // TODO
+
     }
 
-    public boolean messageFlush(String destination)
+    public Range<Long>[] messageAcquire(Range<Long>... range) throws QpidException
     {
-        return false;  //To change body of implemented methods use File | Settings | File Templates.
+        // TODO
+        return null;
     }
 
-    public void messageStop(String destination)
+    public void messageRelease(Range<Long>... range) throws QpidException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // TODO
+
     }// -----------------------------------------------
     //            Local transaction methods
     //  ----------------------------------------------
     public void txSelect() throws QpidException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // TODO
+
     }
 
     public void txCommit() throws QpidException, IllegalStateException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // TODO
+
     }
 
     public void txRollback() throws QpidException, IllegalStateException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // TODO
+
     }
 
-    public void queueDeclare(String queueName, String alternateExchange, Map<String, ?> arguments,
-                             Option... options) throws QpidException
+    public void queueDeclare(String queueName, String alternateExchange, Map<String, ?> arguments, Option... options)
+            throws QpidException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // TODO
+
     }
 
-    public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) throws
-                                                                                                              QpidException
+    public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments)
+            throws QpidException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // TODO
+
     }
 
-    public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) throws
-                                                                                                                QpidException
+    public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments)
+            throws QpidException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // TODO
+
     }
 
     public void queuePurge(String queueName) throws QpidException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // TODO
+
     }
 
     public void queueDelete(String queueName, Option... options) throws QpidException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // TODO
+
     }
 
     public void exchangeDeclare(String exchangeName, String exchangeClass, String alternateExchange,
                                 Map<String, ?> arguments, Option... options) throws QpidException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // TODO
+
     }
 
     public void exchangeDelete(String exchangeName, Option... options) throws QpidException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
-    }
-    
-    public void setMessageListener(String destination,MessagePartListener listener)
-    {
-    	messagListeners.put(destination, listener);
+        // TODO
+
     }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/DestinationImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/DestinationImpl.java?view=diff&rev=563097&r1=563096&r2=563097
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/DestinationImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/DestinationImpl.java Mon Aug  6 03:32:50 2007
@@ -17,8 +17,11 @@
  */
 package org.apache.qpid.nclient.jms;
 
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.Option;
+import org.apache.qpidity.url.BindingURL;
+
 import javax.jms.Destination;
-import javax.jms.JMSException;
 
 /**
  * Implementation of the JMS Destination interface
@@ -35,24 +38,60 @@
      */
     protected SessionImpl _session;
 
+    /**
+     * The excahnge name
+     */
+    protected String _exchangeName;
+
+     /**
+     * The excahnge class
+     */
+    protected String _exchangeClass;
+
+     /**
+     * The queu name
+     */
+    protected String _queueName;
+
     //--- Constructor
     /**
      * Create a new DestinationImpl with a given name.
      *
-     * @param name The name of this destination.
+     * @param name    The name of this destination.
      * @param session The session used to create this destination.
-     * @throws JMSException If the destiantion name is not valid 
+     * @throws QpidException If the destiantion name is not valid
      */
-    protected DestinationImpl(SessionImpl session,  String name)  throws JMSException
+    protected DestinationImpl(SessionImpl session, String name) throws QpidException
     {
-        // TODO validate that this destination name exists
-        //_session.getQpidSession()
         _session = session;
         _name = name;
     }
 
+    /**
+     * Create a destiantion from a binding URL
+     *
+     * @param session The session used to create this queue.
+     * @param binding The URL
+     * @throws QpidException If the URL is not valid
+     */
+    protected DestinationImpl(SessionImpl session, BindingURL binding) throws QpidException
+    {
+        _session = session;
+        _exchangeName = binding.getExchangeName();
+        _exchangeClass = binding.getExchangeClass();
+        _name = binding.getDestinationName();
+        //       _isExclusive = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCLUSIVE));
+        boolean isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE));
+        boolean isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE));
+        _queueName = binding.getQueueName();
+        // create this exchange
+        _session.getQpidSession().exchangeDeclare(_exchangeName, _exchangeClass, null, null,
+                                                  isDurable ? Option.DURABLE : Option.NO_OPTION,
+                                                  isAutoDelete ? Option.AUTO_DELETE : Option.NO_OPTION);
+    }
+
     //---- Getters and Setters
-    
+
     /**
      * Gets the name of this destination.
      *
@@ -84,5 +123,20 @@
         return _name;
     }
 
+    // getter methods 
+    public String getQpidQueueName()
+    {
+        return _queueName;
+    }
+
+    public String getExchangeName()
+    {
+        return _exchangeName;
+    }
+
+    public String getExchangeClass()
+    {
+        return _exchangeClass;
+    }
 }
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java?view=diff&rev=563097&r1=563096&r2=563097
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java Mon Aug  6 03:32:50 2007
@@ -17,8 +17,6 @@
  */
 package org.apache.qpid.nclient.jms;
 
-//import org.apache.qpid.nclient.api.MessageReceiver;
-
 import org.apache.qpid.nclient.jms.message.QpidMessage;
 import org.apache.qpid.nclient.jms.filter.JMSSelectorFilter;
 import org.apache.qpid.nclient.jms.filter.MessageFilter;
@@ -27,6 +25,7 @@
 import org.apache.qpidity.Range;
 import org.apache.qpidity.QpidException;
 import org.apache.qpidity.Option;
+import org.apache.qpidity.exchange.ExchangeDefaults;
 
 import javax.jms.*;
 
@@ -120,15 +119,16 @@
         _noLocal = noLocal;
         _subscriptionName = subscriptionName;
         _isStopped = getSession().isStopped();
+        // let's create a message part assembler
+        /**
+         * A Qpid message listener that pushes messages to this consumer session when this consumer is
+         * asynchronous or directly to this consumer when it is synchronously accessed.
+         */
+        MessagePartListener messageAssembler = new MessagePartListenerAdapter(new QpidMessageListener(this));
+
         if (destination instanceof Queue)
         {
             // this is a queue we expect that this queue exists
-            // let's create a message part assembler
-            /**
-             * A Qpid message listener that pushes messages to this consumer session when this consumer is
-             * asynchronous or directly to this consumer when it is synchronously accessed.
-             */
-            MessagePartListener messageAssembler = new MessagePartListenerAdapter(new QpidMessageListener(this));
             getSession().getQpidSession()
                     .messageSubscribe(destination.getName(), getMessageActorID(),
                                       org.apache.qpid.nclient.Session.CONFIRM_MODE_NOT_REQUIRED,
@@ -144,25 +144,44 @@
         {
             // this is a topic we need to create a temporary queue for this consumer
             // unless this is a durable subscriber
+            String queueName;
             if (subscriptionName != null)
             {
                 // this ia a durable subscriber
                 // create a persistent queue for this subscriber
-                // getSession().getQpidSession().queueDeclare(destination.getName());
+                queueName = "topic-" + subscriptionName;
+                getSession().getQpidSession()
+                        .queueDeclare(queueName, null, null, Option.EXCLUSIVE, Option.DURABLE);
             }
             else
             {
                 // this is a non durable subscriber
                 // create a temporary queue
-
+                queueName = "topic-" + getMessageActorID();
+                getSession().getQpidSession()
+                        .queueDeclare(queueName, null, null, Option.AUTO_DELETE, Option.EXCLUSIVE);
             }
+            // bind this queue with the topic exchange
+            getSession().getQpidSession()
+                    .queueBind(queueName, ExchangeDefaults.TOPIC_EXCHANGE_NAME, destination.getName(), null);
+            // subscribe to this topic 
+            getSession().getQpidSession()
+                    .messageSubscribe(queueName, getMessageActorID(),
+                                      org.apache.qpid.nclient.Session.CONFIRM_MODE_NOT_REQUIRED,
+                                      // We always acquire the messages
+                                      org.apache.qpid.nclient.Session.ACQUIRE_MODE_PRE_ACQUIRE, messageAssembler, null,
+                                      _noLocal ? Option.NO_LOCAL : Option.NO_OPTION,
+                                      // Request exclusive subscription access, meaning only this subscription
+                                      // can access the queue.
+                                      Option.EXCLUSIVE);
+
         }
         // set the flow mode
         getSession().getQpidSession()
                 .messageFlowMode(getMessageActorID(), org.apache.qpid.nclient.Session.MESSAGE_FLOW_MODE_CREDIT);
     }
-    //----- Message consumer API
 
+    //----- Message consumer API
     /**
      * Gets this  MessageConsumer's message selector.
      *
@@ -426,7 +445,14 @@
             {
                 messageOk = _filter.matches(message.getJMSMessage());
             }
-            // right now we need to acquire this message if needed
+            if (!messageOk && _preAcquire)
+            {
+                // this is the case for topics
+                // We need to ack this message
+                acknowledgeMessage(message);
+            }
+            // now we need to acquire this message if needed
+            // this is the case of queue with a message selector set
             if (!_preAcquire && messageOk)
             {
                 messageOk = acquireMessage(message);
@@ -568,5 +594,20 @@
             }
         }
         return result;
+    }
+
+    /**
+     * Acknowledge a message
+     *
+     * @param message The message to be acknowledged
+     * @throws QpidException If the message cannot be acquired due to some internal error.
+     */
+    private void acknowledgeMessage(QpidMessage message) throws QpidException
+    {
+        if (!_preAcquire)
+        {
+            Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID());
+            getSession().getQpidSession().messageAcknowledge(range);
+        }
     }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageProducerImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageProducerImpl.java?view=diff&rev=563097&r1=563096&r2=563097
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageProducerImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageProducerImpl.java Mon Aug  6 03:32:50 2007
@@ -17,96 +17,300 @@
  */
 package org.apache.qpid.nclient.jms;
 
-import javax.jms.MessageProducer;
-import javax.jms.JMSException;
-import javax.jms.Destination;
-import javax.jms.Message;
+import javax.jms.*;
 
 /**
- *  Implements  MessageProducer
+ * Implements  MessageProducer
  */
 public class MessageProducerImpl extends MessageActor implements MessageProducer
 {
+    /**
+     * If true, messages will not get a timestamp.
+     */
+    private boolean _disableTimestamps = false;
+
+    /**
+     * Priority of messages created by this producer.
+     */
+    private int _messagePriority = Message.DEFAULT_PRIORITY;
+
+    /**
+     * Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
+     */
+    private long _timeToLive;
+
+    /**
+     * Delivery mode used for this producer.
+     */
+    private int _deliveryMode = DeliveryMode.PERSISTENT;
+
+    /**
+     * Speicify whether the messageID is disable
+     */
+    private boolean _disableMessageId = false;
 
+    //-- constructors
     public MessageProducerImpl(SessionImpl session, DestinationImpl destination)
     {
         super(session, destination);
     }
 
-    // Interface javax.jms.MessageProducer
-
-    public void setDisableMessageID(boolean b) throws JMSException
-    {
-        //To change body of implemented methods use File | Settings | File Templates.
-    }
-
+    //--- Interface javax.jms.MessageProducer
+    /**
+     * Sets whether message IDs are disabled.
+     *
+     * @param value Specify whether the MessageID must be disabled
+     * @throws JMSException If disabling messageID fails due to some internal error.
+     */
+    public void setDisableMessageID(boolean value) throws JMSException
+    {
+        checkNotClosed();
+        _disableMessageId = value;
+    }
+
+    /**
+     * Gets an indication of whether message IDs are disabled.
+     *
+     * @return true is messageID is disabled, false otherwise
+     * @throws JMSException If getting whether messagID is disabled fails due to some internal error.
+     */
     public boolean getDisableMessageID() throws JMSException
     {
-        return false;  //To change body of implemented methods use File | Settings | File Templates.
-    }
-
-    public void setDisableMessageTimestamp(boolean b) throws JMSException
-    {
-        //To change body of implemented methods use File | Settings | File Templates.
+        checkNotClosed();
+        return _disableMessageId;
     }
 
+    /**
+     * Sets whether message timestamps are disabled.
+     * <P> JMS spec says:
+     * <p> Since timestamps take some effort to create and increase a
+     * message's size, some JMS providers may be able to optimize message
+     * overhead if they are given a hint that the timestamp is not used by an
+     * application....
+     * these messages must have the timestamp set to zero; if the provider
+     * ignores the hint, the timestamp must be set to its normal value.
+     * <p>Message timestamps are enabled by default.
+     *
+     * @param value Indicates if message timestamps are disabled
+     * @throws JMSException if disabling the timestamps fails due to some internal error.
+     */
+    public void setDisableMessageTimestamp(boolean value) throws JMSException
+    {
+        checkNotClosed();
+        _disableTimestamps = value;
+    }
+
+    /**
+     * Gets an indication of whether message timestamps are disabled.
+     *
+     * @return an indication of whether message timestamps are disabled
+     * @throws JMSException if getting whether timestamps are disabled fails due to some internal error.
+     */
     public boolean getDisableMessageTimestamp() throws JMSException
     {
-        return false;  //To change body of implemented methods use File | Settings | File Templates.
-    }
-
-    public void setDeliveryMode(int i) throws JMSException
-    {
-        //To change body of implemented methods use File | Settings | File Templates.
+        checkNotClosed();
+        return _disableTimestamps;
     }
 
+    /**
+     * Sets the producer's default delivery mode.
+     * <p> JMS specification says:
+     * <p>Delivery mode is set to {@link DeliveryMode#PERSISTENT} by default.
+     *
+     * @param deliveryMode The message delivery mode for this message producer; legal
+     *                     values are {@link DeliveryMode#NON_PERSISTENT}
+     *                     and {@link DeliveryMode#PERSISTENT}.
+     * @throws JMSException if setting the delivery mode fails due to some internal error.
+     */
+    public void setDeliveryMode(int deliveryMode) throws JMSException
+    {
+        checkNotClosed();
+        if ((deliveryMode != DeliveryMode.NON_PERSISTENT) && (deliveryMode != DeliveryMode.PERSISTENT))
+        {
+            throw new JMSException(
+                    "DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + deliveryMode + " is illegal");
+        }
+        _deliveryMode = deliveryMode;
+    }
+
+    /**
+     * Gets the producer's delivery mode.
+     *
+     * @return The message delivery mode for this message producer
+     * @throws JMSException If getting the delivery mode fails due to some internal error.
+     */
     public int getDeliveryMode() throws JMSException
     {
-        return 0;  //To change body of implemented methods use File | Settings | File Templates.
-    }
-
-    public void setPriority(int i) throws JMSException
-    {
-        //To change body of implemented methods use File | Settings | File Templates.
+        checkNotClosed();
+        return _deliveryMode;
     }
 
+    /**
+     * Sets the producer's message priority.
+     * <p> The jms spec says:
+     * <p> The JMS API defines ten levels of priority value, with 0 as the
+     * lowest priority and 9 as the highest. Clients should consider priorities
+     * 0-4 as gradations of normal priority and priorities 5-9 as gradations
+     * of expedited priority.
+     * <p> Priority is set to 4 by default.
+     *
+     * @param priority The message priority for this message producer; must be a value between 0 and 9
+     * @throws JMSException if setting this producer priority fails due to some internal error.
+     */
+    public void setPriority(int priority) throws JMSException
+    {
+        checkNotClosed();
+        if ((priority < 0) || (priority > 9))
+        {
+            throw new IllegalArgumentException(
+                    "Priority of " + priority + " is illegal. Value must be in range 0 to 9");
+        }
+        _messagePriority = priority;
+    }
+
+    /**
+     * Gets the producer's message priority.
+     *
+     * @return The message priority for this message producer.
+     * @throws JMSException If getting this producer message priority fails due to some internal error.
+     */
     public int getPriority() throws JMSException
     {
-        return 0;  //To change body of implemented methods use File | Settings | File Templates.
-    }
-
-    public void setTimeToLive(long l) throws JMSException
-    {
-        //To change body of implemented methods use File | Settings | File Templates.
+        checkNotClosed();
+        return _messagePriority;
     }
 
+    /**
+     * Sets the default length of time in milliseconds from its dispatch time
+     * that a produced message should be retained by the message system.
+     * <p> The JMS spec says that time to live must be set to zero by default.
+     *
+     * @param timeToLive The message time to live in milliseconds; zero is unlimited
+     * @throws JMSException If setting the default time to live fails due to some internal error.
+     */
+    public void setTimeToLive(long timeToLive) throws JMSException
+    {
+        checkNotClosed();
+        if (timeToLive < 0)
+        {
+            throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + timeToLive);
+        }
+        _timeToLive = timeToLive;
+    }
+
+    /**
+     * Gets the default length of time in milliseconds from its dispatch time
+     * that a produced message should be retained by the message system.
+     *
+     * @return The default message time to live in milliseconds; zero is unlimited
+     * @throws JMSException if getting the default time to live fails due to some internal error.
+     * @see javax.jms.MessageProducer#setTimeToLive
+     */
     public long getTimeToLive() throws JMSException
     {
-        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+        checkNotClosed();
+        return _timeToLive;
     }
 
+    /**
+     * Gets the destination associated with this producer.
+     *
+     * @return This producer's destination.
+     * @throws JMSException If getting the destination for this producer fails
+     *                      due to some internal error.
+     */
     public Destination getDestination() throws JMSException
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+        checkNotClosed();
+        return _destination;
     }
 
+    /**
+     * Sends a message using the producer's default delivery mode, priority, destination
+     * and time to live.
+     *
+     * @param message the message to be sent
+     * @throws JMSException                If sending the message fails due to some internal error.
+     * @throws MessageFormatException      If an invalid message is specified.
+     * @throws InvalidDestinationException If this producer destination is invalid.
+     * @throws java.lang.UnsupportedOperationException
+     *                                     If a client uses this method with a producer that did
+     *                                     not specify a destination at creation time.
+     */
     public void send(Message message) throws JMSException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
-    }
-
-    public void send(Message message, int i, int i1, long l) throws JMSException
-    {
-        //To change body of implemented methods use File | Settings | File Templates.
+        send(message, _deliveryMode, _messagePriority, _timeToLive);
     }
 
+    /**
+     * Sends a message to this producer default destination, specifying delivery mode,
+     * priority, and time to live.
+     *
+     * @param message      The message to send
+     * @param deliveryMode The delivery mode to use
+     * @param priority     The priority for this message
+     * @param timeToLive   The message's lifetime (in milliseconds)
+     * @throws JMSException                If sending the message fails due to some internal error.
+     * @throws MessageFormatException      If an invalid message is specified.
+     * @throws InvalidDestinationException If this producer's destination is invalid.
+     * @throws java.lang.UnsupportedOperationException
+     *                                     If a client uses this method with a producer that did
+     *                                     not specify a destination at creation time.
+     */
+    public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
+    {
+        send(_destination, message, deliveryMode, priority, timeToLive);
+    }
+
+    /**
+     * Sends a message to a specified destination using this producer's default
+     * delivery mode, priority and time to live.
+     * <p/>
+     * <P>Typically, a message producer is assigned a destination at creation
+     * time; however, the JMS API also supports unidentified message producers,
+     * which require that the destination be supplied every time a message is
+     * sent.
+     *
+     * @param destination The destination to send this message to
+     * @param message     The message to send
+     * @throws JMSException                If sending the message fails due to some internal error.
+     * @throws MessageFormatException      If an invalid message is specified.
+     * @throws InvalidDestinationException If an invalid destination is specified.
+     */
     public void send(Destination destination, Message message) throws JMSException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        send(destination, message, _deliveryMode, _messagePriority, _timeToLive);
     }
 
-    public void send(Destination destination, Message message, int i, int i1, long l) throws JMSException
-    {
-        //To change body of implemented methods use File | Settings | File Templates.
+    /**
+     * Sends a message to a destination specifying delivery mode, priority and time to live.
+     *
+     * @param destination  The destination to send this message to.
+     * @param message      The message to be sent.
+     * @param deliveryMode The delivery mode to use.
+     * @param priority     The priority for this message.
+     * @param timeToLive   The message's lifetime (in milliseconds)
+     * @throws JMSException                If sending the message fails due to some internal error.
+     * @throws MessageFormatException      If an invalid message is specified.
+     * @throws InvalidDestinationException If an invalid destination is specified.
+     */
+    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
+            throws JMSException
+    {
+        checkNotClosed();
+        getSession().checkDestination(destination);
+        // Do not allow negative timeToLive values
+        if (timeToLive < 0)
+        {
+            throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + timeToLive);
+        }
+        // check that the message is not a foreign one
+
+        // set the properties
+
+        //
+
+        // dispatch it
+        // todo getSession().getQpidSession().messageTransfer(((DestinationImpl) destination).getExchangeName(), message, Option);
     }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueImpl.java?view=diff&rev=563097&r1=563096&r2=563097
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueImpl.java Mon Aug  6 03:32:50 2007
@@ -17,6 +17,11 @@
  */
 package org.apache.qpid.nclient.jms;
 
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.Option;
+import org.apache.qpidity.url.BindingURL;
+import org.apache.qpidity.exchange.ExchangeDefaults;
+
 import javax.jms.Queue;
 import javax.jms.JMSException;
 
@@ -32,15 +37,32 @@
      *
      * @param name    The name of this queue.
      * @param session The session used to create this queue.
-     * @throws JMSException If the queue name is not valid
+     * @throws QpidException If the queue name is not valid
      */
-    protected QueueImpl(SessionImpl session, String name) throws JMSException
+    protected QueueImpl(SessionImpl session, String name) throws QpidException
     {
         super(session, name);
+        _exchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+        _exchangeClass = ExchangeDefaults.FANOUT_EXCHANGE_CLASS;
+        _queueName = name;
+        // check that this queue exist on the server
+        // As pasive is set the server will not create the queue.
+        session.getQpidSession().queueDeclare(name, null, null, Option.PASSIVE);
     }
 
-    //---- Interface javax.jms.Queue
+    /**
+     * Create a destiantion from a binding URL
+     *
+     * @param session  The session used to create this queue.
+     * @param binding The URL
+     * @throws QpidException If the URL is not valid
+     */
+    protected QueueImpl(SessionImpl session, BindingURL binding) throws QpidException
+    {
+        super(session, binding);        
+    }
 
+    //---- Interface javax.jms.Queue
     /**
      * Gets the name of this queue.
      *

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java?view=diff&rev=563097&r1=563096&r2=563097
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java Mon Aug  6 03:32:50 2007
@@ -569,7 +569,7 @@
     {
         checkNotClosed();
         checkDestination(destination);
-        MessageConsumerImpl consumer = null;
+        MessageConsumerImpl consumer;
         try
         {
             consumer = new MessageConsumerImpl(this, (DestinationImpl) destination, messageSelector, noLocal, null);
@@ -602,7 +602,16 @@
     public Queue createQueue(String queueName) throws JMSException
     {
         checkNotClosed();
-        return new QueueImpl(this, queueName);
+        Queue result;
+        try
+        {
+            result = new QueueImpl(this, queueName);
+        }
+        catch (QpidException e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+        return result;
     }
 
     /**
@@ -624,7 +633,16 @@
     public Topic createTopic(String topicName) throws JMSException
     {
         checkNotClosed();
-        return new TopicImpl(this, topicName);
+        Topic result;
+        try
+        {
+            result = new TopicImpl(this, topicName);
+        }
+        catch (QpidException e)
+        {
+           throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+        return result;
     }
 
     /**
@@ -713,25 +731,43 @@
     }
 
     /**
-     * Create a TemporaryQueue. Its lifetime will be tha of the Connection unless it is deleted earlier.
+     * Create a TemporaryQueue. Its lifetime will be the Connection unless it is deleted earlier.
      *
      * @return A temporary queue.
      * @throws JMSException If creating the temporary queue fails due to some internal error.
      */
     public TemporaryQueue createTemporaryQueue() throws JMSException
     {
-        return new TemporaryQueueImpl(this);
+        TemporaryQueue result;
+        try
+        {
+            result = new TemporaryQueueImpl(this);
+        }
+        catch (QpidException e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+        return result;
     }
 
     /**
-     * Create a TemporaryTopic. Its lifetime will be tha of the Connection unless it is deleted earlier.
+     * Create a TemporaryTopic. Its lifetime will be the Connection unless it is deleted earlier.
      *
      * @return A temporary topic.
      * @throws JMSException If creating the temporary topic fails due to some internal error.
      */
     public TemporaryTopic createTemporaryTopic() throws JMSException
     {
-        return new TemporaryTopicImpl(this);
+        TemporaryTopic result;
+        try
+        {
+            result = new TemporaryTopicImpl(this);
+        }
+        catch (QpidException e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+        return result;
     }
 
     /**

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryQueueImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryQueueImpl.java?view=diff&rev=563097&r1=563096&r2=563097
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryQueueImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryQueueImpl.java Mon Aug  6 03:32:50 2007
@@ -17,13 +17,18 @@
  */
 package org.apache.qpid.nclient.jms;
 
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.Option;
+import org.apache.qpidity.exchange.ExchangeDefaults;
+
 import javax.jms.TemporaryQueue;
 import javax.jms.JMSException;
+import java.util.UUID;
 
 /**
  * Implements TemporaryQueue
  */
-public class TemporaryQueueImpl extends QueueImpl implements TemporaryQueue, TemporaryDestination
+public class TemporaryQueueImpl extends DestinationImpl implements TemporaryQueue, TemporaryDestination
 {
     /**
      * Indicates whether this temporary queue is deleted.
@@ -32,16 +37,23 @@
 
     //--- constructor
 
-     /**
+    /**
      * Create a new TemporaryQueueImpl with a given name.
      *
      * @param session The session used to create this TemporaryQueueImpl.
-     * @throws JMSException If creating the TemporaryQueueImpl fails due to some error.
+     * @throws QpidException If creating the TemporaryQueueImpl fails due to some error.
      */
-    public TemporaryQueueImpl(SessionImpl session) throws JMSException
+    protected TemporaryQueueImpl(SessionImpl session) throws QpidException
     {
-        // temporary destinations do not have names and are not registered in the JNDI namespace.
+        // temporary destinations do not have names
         super(session, "NAME_NOT_SET");
+        _exchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+        _exchangeClass = ExchangeDefaults.FANOUT_EXCHANGE_CLASS;
+        _queueName = "TempQueue-" + UUID.randomUUID();
+        // check that this queue exist on the server
+        // As pasive is set the server will not create the queue.
+        session.getQpidSession().queueDeclare(_queueName, null, null, Option.AUTO_DELETE);
+        session.getQpidSession().queueBind(_queueName, _exchangeName, _queueName, null);
     }
 
     //-- TemporaryDestination Interface
@@ -59,11 +71,22 @@
     /**
      * Delete this temporary destinaiton
      *
-     * @throws JMSException If deleting this temporary queue fails due to some error. 
+     * @throws JMSException If deleting this temporary queue fails due to some error.
      */
     public void delete() throws JMSException
     {
         // todo delete this temporary queue
         _isDeleted = true;
+    }
+
+    //---- Interface javax.jms.Queue
+    /**
+     * Gets the name of this queue.
+     *
+     * @return This queue's name.
+     */
+    public String getQueueName() throws JMSException
+    {
+        return super.getName();
     }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryTopicImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryTopicImpl.java?view=diff&rev=563097&r1=563096&r2=563097
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryTopicImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryTopicImpl.java Mon Aug  6 03:32:50 2007
@@ -17,6 +17,8 @@
  */
 package org.apache.qpid.nclient.jms;
 
+import org.apache.qpidity.QpidException;
+
 import javax.jms.TemporaryTopic;
 import javax.jms.JMSException;
 
@@ -36,9 +38,9 @@
      * Create a new TemporaryTopicImpl with a given name.
      *
      * @param session The session used to create this TemporaryTopicImpl.
-     * @throws JMSException If creating the TemporaryTopicImpl fails due to some error.
+     * @throws QpidException If creating the TemporaryTopicImpl fails due to some error.
      */
-    public TemporaryTopicImpl(SessionImpl session) throws JMSException
+    protected TemporaryTopicImpl(SessionImpl session) throws QpidException
     {
         // temporary destinations do not have names.
         super(session, "NAME_NOT_SET");

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicImpl.java?view=diff&rev=563097&r1=563096&r2=563097
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicImpl.java Mon Aug  6 03:32:50 2007
@@ -17,8 +17,11 @@
  */
 package org.apache.qpid.nclient.jms;
 
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.exchange.ExchangeDefaults;
+import org.apache.qpidity.url.BindingURL;
+
 import javax.jms.Topic;
-import javax.jms.JMSException;
 
 /**
  * Implementation of the javax.jms.Topic interface.
@@ -29,16 +32,30 @@
     /**
      * Create a new TopicImpl with a given name.
      *
-     * @param name The name of this topic
+     * @param name    The name of this topic
      * @param session The session used to create this queue.
-     * @throws JMSException If the topic name is not valid
+     * @throws QpidException If the topic name is not valid
      */
-    public TopicImpl(SessionImpl session, String name) throws JMSException
+    public TopicImpl(SessionImpl session, String name) throws QpidException
     {
         super(session, name);
+        _exchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
+        _exchangeClass = ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
+    }
+
+    /**
+     * Create a TopicImpl from a binding URL
+     *
+     * @param session The session used to create this Topic.
+     * @param binding The URL
+     * @throws QpidException If the URL is not valid
+     */
+    protected TopicImpl(SessionImpl session, BindingURL binding) throws QpidException
+    {
+        super(session, binding);
     }
 
-    //--- javax.jsm.Topic Interface 
+    //--- javax.jsm.Topic Interface
     /**
      * Gets the name of this topic.
      *

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractJMSMessage.java?view=diff&rev=563097&r1=563096&r2=563097
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractJMSMessage.java Mon Aug  6 03:32:50 2007
@@ -33,17 +33,14 @@
 import org.apache.qpid.url.BindingURL;
 import org.apache.qpid.url.URLSyntaxException;
 
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageNotReadableException;
-import javax.jms.MessageNotWriteableException;
+import javax.jms.*;
 
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.Map;
 import java.util.UUID;
 
-public abstract class AbstractJMSMessage extends QpidMessage implements org.apache.qpid.jms.Message
+public abstract class AbstractJMSMessage extends QpidMessage implements Message
 {
     private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap());
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java?view=diff&rev=563097&r1=563096&r2=563097
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java Mon Aug  6 03:32:50 2007
@@ -145,6 +145,7 @@
         //todo
         return new Long(1);
     }
+
 }
 
 

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/AMQBindingURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/AMQBindingURL.java?view=auto&rev=563097
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/AMQBindingURL.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/AMQBindingURL.java Mon Aug  6 03:32:50 2007
@@ -0,0 +1,261 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.url;
+
+import org.apache.qpidity.exchange.ExchangeDefaults;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+import java.util.HashMap;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+public class AMQBindingURL implements BindingURL
+{
+    private static final Logger _logger = LoggerFactory.getLogger(AMQBindingURL.class);
+
+    String _url;
+    String _exchangeClass;
+    String _exchangeName;
+    String _destinationName;
+    String _queueName;
+    private HashMap<String, String> _options;
+
+    public AMQBindingURL(String url) throws URLSyntaxException
+    {
+        // format:
+        // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Parsing URL: " + url);
+        }
+        _url = url;
+        _options = new HashMap<String, String>();
+        parseBindingURL();
+    }
+
+    private void parseBindingURL() throws URLSyntaxException
+    {
+        try
+        {
+            URI connection = new URI(_url);
+            String exchangeClass = connection.getScheme();
+            if (exchangeClass == null)
+            {
+                _url = ExchangeDefaults.DIRECT_EXCHANGE_CLASS + "://" + "" + "//" + _url;
+                // URLHelper.parseError(-1, "Exchange Class not specified.", _url);
+                parseBindingURL();
+                return;
+            }
+            else
+            {
+                setExchangeClass(exchangeClass);
+            }
+            String exchangeName = connection.getHost();
+            if (exchangeName == null)
+            {
+                if (getExchangeClass().equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
+                {
+                    setExchangeName("");
+                }
+                else
+                {
+                    throw URLHelper.parseError(-1, "Exchange Name not specified.", _url);
+                }
+            }
+            else
+            {
+                setExchangeName(exchangeName);
+            }
+            String queueName;
+            if ((connection.getPath() == null) || connection.getPath().equals(""))
+            {
+                throw URLHelper.parseError(_url.indexOf(_exchangeName) + _exchangeName.length(),
+                                           "Destination or Queue requried", _url);
+            }
+            else
+            {
+                int slash = connection.getPath().indexOf("/", 1);
+                if (slash == -1)
+                {
+                    throw URLHelper.parseError(_url.indexOf(_exchangeName) + _exchangeName.length(),
+                                               "Destination requried", _url);
+                }
+                else
+                {
+                    String path = connection.getPath();
+                    setDestinationName(path.substring(1, slash));
+
+                    // We don't set queueName yet as the actual value we use depends on options set
+                    // when we are dealing with durable subscriptions
+
+                    queueName = path.substring(slash + 1);
+
+                }
+            }
+
+            URLHelper.parseOptions(_options, connection.getQuery());
+            processOptions();
+            // We can now call setQueueName as the URL is full parsed.
+            setQueueName(queueName);
+            // Fragment is #string (not used)
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("URL Parsed: " + this);
+            }
+        }
+        catch (URISyntaxException uris)
+        {
+            throw URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
+        }
+    }
+
+
+    private void processOptions()
+    {
+        // this is where we would parse any options that needed more than just storage.
+    }
+
+    public String getURL()
+    {
+        return _url;
+    }
+
+    public String getExchangeClass()
+    {
+        return _exchangeClass;
+    }
+
+    private void setExchangeClass(String exchangeClass)
+    {
+
+        _exchangeClass = exchangeClass;
+        if (exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
+        {
+            setOption(BindingURL.OPTION_EXCLUSIVE, "true");
+        }
+
+    }
+
+    public String getExchangeName()
+    {
+        return _exchangeName;
+    }
+
+    private void setExchangeName(String name)
+    {
+        _exchangeName = name;
+    }
+
+    public String getDestinationName()
+    {
+        return _destinationName;
+    }
+
+    private void setDestinationName(String name)
+    {
+        _destinationName = name;
+    }
+
+    public String getQueueName()
+    {
+        return _queueName;
+    }
+
+    public void setQueueName(String name) throws URLSyntaxException
+    {
+        if (_exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
+        {
+            if (Boolean.parseBoolean(getOption(OPTION_DURABLE)))
+            {
+                if (containsOption(BindingURL.OPTION_CLIENTID) && containsOption(BindingURL.OPTION_SUBSCRIPTION))
+                {
+                    _queueName = getOption(BindingURL.OPTION_CLIENTID + ":" + BindingURL.OPTION_SUBSCRIPTION);
+                }
+                else
+                {
+                    throw URLHelper.parseError(-1,
+                                               "Durable subscription must have values for " + BindingURL.OPTION_CLIENTID + " and " + BindingURL.OPTION_SUBSCRIPTION + ".",
+                                               _url);
+
+                }
+            }
+            else
+            {
+                _queueName = null;
+            }
+        }
+        else
+        {
+            _queueName = name;
+        }
+
+    }
+
+    public String getOption(String key)
+    {
+        return _options.get(key);
+    }
+
+    public void setOption(String key, String value)
+    {
+        _options.put(key, value);
+    }
+
+    public boolean containsOption(String key)
+    {
+        return _options.containsKey(key);
+    }
+
+    public String getRoutingKey()
+    {
+        if (_exchangeClass.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
+        {
+            return getQueueName();
+        }
+
+        if (containsOption(BindingURL.OPTION_ROUTING_KEY))
+        {
+            return getOption(OPTION_ROUTING_KEY);
+        }
+
+        return getDestinationName();
+    }
+
+    public void setRoutingKey(String key)
+    {
+        setOption(OPTION_ROUTING_KEY, key);
+    }
+
+    public String toString()
+    {
+        StringBuffer sb = new StringBuffer();
+
+        sb.append(_exchangeClass);
+        sb.append("://");
+        sb.append(_exchangeName);
+        sb.append('/');
+        sb.append(_destinationName);
+        sb.append('/');
+        sb.append(_queueName);
+
+        sb.append(URLHelper.printOptions(_options));
+
+        return sb.toString();
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/AMQBindingURL.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/BindingURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/BindingURL.java?view=auto&rev=563097
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/BindingURL.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/BindingURL.java Mon Aug  6 03:32:50 2007
@@ -0,0 +1,53 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.url;
+
+import org.apache.qpid.framing.AMQShortString;
+
+/*
+    Binding URL format:
+    <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
+*/
+public interface BindingURL
+{
+    public static final String OPTION_EXCLUSIVE = "exclusive";
+    public static final String OPTION_AUTODELETE = "autodelete";
+    public static final String OPTION_DURABLE = "durable";
+    public static final String OPTION_CLIENTID = "clientid";
+    public static final String OPTION_SUBSCRIPTION = "subscription";
+    public static final String OPTION_ROUTING_KEY = "routingkey";
+
+
+    String getURL();
+
+   String getExchangeClass();
+
+    String getExchangeName();
+
+    String getDestinationName();
+
+    String getQueueName();
+
+    String getOption(String key);
+
+    boolean containsOption(String key);
+
+    String getRoutingKey();
+
+    String toString();
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/BindingURL.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/URLHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/URLHelper.java?view=auto&rev=563097
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/URLHelper.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/URLHelper.java Mon Aug  6 03:32:50 2007
@@ -0,0 +1,169 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.url;
+
+import java.util.HashMap;
+
+public class URLHelper
+{
+    public static char DEFAULT_OPTION_SEPERATOR = '&';
+    public static char ALTERNATIVE_OPTION_SEPARATOR = ',';
+    public static char BROKER_SEPARATOR = ';';
+
+    public static void parseOptions(HashMap<String, String> optionMap, String options) throws URLSyntaxException
+    {
+        // options looks like this
+        // brokerlist='tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'',failover='method?option='value',option='value''
+
+        if ((options == null) || (options.indexOf('=') == -1))
+        {
+            return;
+        }
+
+        int optionIndex = options.indexOf('=');
+
+        String option = options.substring(0, optionIndex);
+
+        int length = options.length();
+
+        int nestedQuotes = 0;
+
+        // to store index of final "'"
+        int valueIndex = optionIndex;
+
+        // Walk remainder of url.
+        while ((nestedQuotes > 0) || (valueIndex < length))
+        {
+            valueIndex++;
+
+            if (valueIndex >= length)
+            {
+                break;
+            }
+
+            if (options.charAt(valueIndex) == '\'')
+            {
+                if ((valueIndex + 1) < options.length())
+                {
+                    if ((options.charAt(valueIndex + 1) == DEFAULT_OPTION_SEPERATOR)
+                            || (options.charAt(valueIndex + 1) == ALTERNATIVE_OPTION_SEPARATOR)
+                            || (options.charAt(valueIndex + 1) == BROKER_SEPARATOR)
+                            || (options.charAt(valueIndex + 1) == '\''))
+                    {
+                        nestedQuotes--;
+
+                        if (nestedQuotes == 0)
+                        {
+                            // We've found the value of an option
+                            break;
+                        }
+                    }
+                    else
+                    {
+                        nestedQuotes++;
+                    }
+                }
+                else
+                {
+                    // We are at the end of the string
+                    // Check to see if we are corectly closing quotes
+                    if (options.charAt(valueIndex) == '\'')
+                    {
+                        nestedQuotes--;
+                    }
+
+                    break;
+                }
+            }
+        }
+
+        if ((nestedQuotes != 0) || (valueIndex < (optionIndex + 2)))
+        {
+            int sepIndex = 0;
+
+            // Try and identify illegal separator character
+            if (nestedQuotes > 1)
+            {
+                for (int i = 0; i < nestedQuotes; i++)
+                {
+                    sepIndex = options.indexOf('\'', sepIndex);
+                    sepIndex++;
+                }
+            }
+
+            if ((sepIndex >= options.length()) || (sepIndex == 0))
+            {
+                throw parseError(valueIndex, "Unterminated option", options);
+            }
+            else
+            {
+                throw parseError(sepIndex, "Unterminated option. Possible illegal option separator:'"
+                    + options.charAt(sepIndex) + "'", options);
+            }
+        }
+
+        // optionIndex +2 to skip "='"
+        String value = options.substring(optionIndex + 2, valueIndex);
+
+        optionMap.put(option, value);
+
+        if (valueIndex < (options.length() - 1))
+        {
+            // Recurse to get remaining options
+            parseOptions(optionMap, options.substring(valueIndex + 2));
+        }
+    }
+
+    public static URLSyntaxException parseError(int index, String error, String url)
+    {
+        return parseError(index, 1, error, url);
+    }
+
+    public static URLSyntaxException parseError(int index, int length, String error, String url)
+    {
+        return new URLSyntaxException(url, error, index, length);
+    }
+
+    public static String printOptions(HashMap<String, String> options)
+    {
+        if (options.isEmpty())
+        {
+            return "";
+        }
+        else
+        {
+            StringBuffer sb = new StringBuffer();
+            sb.append('?');
+            for (String key : options.keySet())
+            {
+                sb.append(key);
+
+                sb.append("='");
+
+                sb.append(options.get(key));
+
+                sb.append("'");
+                sb.append(DEFAULT_OPTION_SEPERATOR);
+            }
+
+            sb.deleteCharAt(sb.length() - 1);
+
+            return sb.toString();
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/URLHelper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/URLSyntaxException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/URLSyntaxException.java?view=auto&rev=563097
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/URLSyntaxException.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/URLSyntaxException.java Mon Aug  6 03:32:50 2007
@@ -0,0 +1,94 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.url;
+
+import java.net.URISyntaxException;
+
+public class URLSyntaxException extends URISyntaxException
+{
+    private int _length;
+
+    public URLSyntaxException(String url, String error, int index, int length)
+    {
+        super(url, error, index);
+
+        _length = length;
+    }
+
+    private static String getPositionString(int index, int length)
+    {
+        StringBuffer sb = new StringBuffer(index + 1);
+
+        for (int i = 0; i < index; i++)
+        {
+            sb.append(" ");
+        }
+
+        if (length > -1)
+        {
+            for (int i = 0; i < length; i++)
+            {
+                sb.append('^');
+            }
+        }
+
+        return sb.toString();
+    }
+
+
+    public String toString()
+    {
+        StringBuffer sb = new StringBuffer();
+
+        sb.append(getReason());
+
+        if (getIndex() > -1)
+        {
+            if (_length != -1)
+            {
+                sb.append(" between indicies ");
+                sb.append(getIndex());
+                sb.append(" and ");
+                sb.append(_length);
+            }
+            else
+            {
+                sb.append(" at index ");
+                sb.append(getIndex());
+            }
+        }
+
+        sb.append(" ");
+        if (getIndex() != -1)
+        {
+            sb.append("\n");
+        }
+
+        sb.append(getInput());
+
+        if (getIndex() != -1)
+        {
+            sb.append("\n");
+            sb.append(getPositionString(getIndex(), _length));
+        }
+
+        return sb.toString();
+    }
+
+
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/url/URLSyntaxException.java
------------------------------------------------------------------------------
    svn:eol-style = native