You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/02/28 17:12:09 UTC

svn commit: r632040 [2/2] - in /incubator/qpid/branches/thegreatmerge: ./ qpid/cpp/ qpid/cpp/rubygen/ qpid/cpp/rubygen/0-10/ qpid/cpp/rubygen/99-0/ qpid/cpp/rubygen/templates/ qpid/cpp/src/ qpid/cpp/src/qpid/ qpid/cpp/src/qpid/amqp_0_10/ qpid/cpp/src/q...

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java Thu Feb 28 08:11:52 2008
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.client;
 
+import java.net.URISyntaxException;
+
 import javax.jms.Destination;
 import javax.naming.NamingException;
 import javax.naming.Reference;
@@ -31,7 +33,6 @@
 import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.url.BindingURL;
 import org.apache.qpid.url.URLHelper;
-import org.apache.qpid.url.URLSyntaxException;
 
 
 public abstract class AMQDestination implements Destination, Referenceable
@@ -50,6 +51,8 @@
 
     private AMQShortString _routingKey;
 
+    private AMQShortString[] _bindingKeys;
+
     private String _url;
     private AMQShortString _urlAsShortString;
 
@@ -64,7 +67,7 @@
     public static final Integer TOPIC_TYPE = Integer.valueOf(2);
     public static final Integer UNKNOWN_TYPE = Integer.valueOf(3);
 
-    protected AMQDestination(String url) throws URLSyntaxException
+    protected AMQDestination(String url) throws URISyntaxException
     {
         this(new AMQBindingURL(url));
     }
@@ -79,26 +82,43 @@
         _isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE));
         _queueName = binding.getQueueName() == null ? null : new AMQShortString(binding.getQueueName());
         _routingKey = binding.getRoutingKey() == null ? null : new AMQShortString(binding.getRoutingKey());
+        _bindingKeys = binding.getBindingKeys() == null || binding.getBindingKeys().length == 0 ? new AMQShortString[0] : binding.getBindingKeys();
     }
 
     protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, AMQShortString queueName)
     {
-        this(exchangeName, exchangeClass, routingKey, false, false, queueName);
+        this(exchangeName, exchangeClass, routingKey, false, false, queueName, null);
+    }
+
+    protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, AMQShortString queueName, AMQShortString[] bindingKeys)
+    {
+        this(exchangeName, exchangeClass, routingKey, false, false, queueName,bindingKeys);
     }
 
     protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName)
     {
-        this(exchangeName, exchangeClass, destinationName, false, false, null);
+        this(exchangeName, exchangeClass, destinationName, false, false, null,null);
+    }
+
+    protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
+            boolean isAutoDelete, AMQShortString queueName)
+    {
+        this(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, false,null);
     }
 
     protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
-                             boolean isAutoDelete, AMQShortString queueName)
+                             boolean isAutoDelete, AMQShortString queueName,AMQShortString[] bindingKeys)
     {
-        this(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, false);
+        this(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, false,bindingKeys);
+    }
+
+    protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
+            boolean isAutoDelete, AMQShortString queueName, boolean isDurable){
+        this (exchangeName, exchangeClass, routingKey, isExclusive,isAutoDelete,queueName,isDurable,null);
     }
 
     protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
-                             boolean isAutoDelete, AMQShortString queueName, boolean isDurable)
+                             boolean isAutoDelete, AMQShortString queueName, boolean isDurable,AMQShortString[] bindingKeys)
     {
         // If used with a fannout exchange, the routing key can be null
         if ( !ExchangeDefaults.FANOUT_EXCHANGE_CLASS.equals(exchangeClass) && routingKey == null)
@@ -120,6 +140,7 @@
         _isAutoDelete = isAutoDelete;
         _queueName = queueName;
         _isDurable = isDurable;
+        _bindingKeys = bindingKeys == null || bindingKeys.length == 0 ? new AMQShortString[0] : bindingKeys;
     }
 
     public AMQShortString getEncodedName()
@@ -181,6 +202,20 @@
         return _routingKey;
     }
 
+    public AMQShortString[] getBindingKeys()
+    {
+        if (_bindingKeys != null && _bindingKeys.length > 0)
+        {
+            return _bindingKeys;
+        }
+        else
+        {
+            // catering to the common use case where the
+            //routingKey is the same as the bindingKey.
+            return new AMQShortString[]{_routingKey};
+        }
+    }
+
     public boolean isExclusive()
     {
         return _isExclusive;
@@ -237,6 +272,21 @@
                 sb.append("='");
                 sb.append(_routingKey).append("'");
                 sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+            }
+
+            // We can't allow both routingKey and bindingKey
+            if (_routingKey == null && _bindingKeys != null && _bindingKeys.length>0)
+            {
+
+                for (AMQShortString bindingKey:_bindingKeys)
+                {
+                    sb.append(BindingURL.OPTION_BINDING_KEY);
+                    sb.append("='");
+                    sb.append(bindingKey);
+                    sb.append("'");
+                    sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+
+                }
             }
 
             if (_isDurable)

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java Thu Feb 28 08:11:52 2008
@@ -61,8 +61,14 @@
     public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName)
     {
         super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, false,
-              false, queueName, false);    }
+              false, queueName, false);
+    }
 
+    public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName,AMQShortString[] bindingKeys)
+    {
+        super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, false,
+              false, queueName, false,bindingKeys);
+    }
 
     /**
      * Create a reference to a non temporary queue. Note this does not actually imply the queue exists.
@@ -126,11 +132,15 @@
         this(exchangeName, routingKey, queueName, exclusive, autoDelete, false);
     }
 
-
     public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable)
     {
+        this(exchangeName,routingKey,queueName,exclusive,autoDelete,durable,null);
+    }
+
+    public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable,AMQShortString[] bindingKeys)
+    {
         super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, exclusive,
-              autoDelete, queueName, durable);
+              autoDelete, queueName, durable, bindingKeys);
     }
 
     public AMQShortString getRoutingKey()

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Feb 28 08:11:52 2008
@@ -21,6 +21,7 @@
 package org.apache.qpid.client;
 
 import java.io.Serializable;
+import java.net.URISyntaxException;
 import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -86,7 +87,6 @@
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.url.AMQBindingURL;
-import org.apache.qpid.url.URLSyntaxException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -564,19 +564,14 @@
      * @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges?
      */
     public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
-                          final AMQShortString exchangeName) throws AMQException
+                          final AMQShortString exchangeName,final AMQDestination destination) throws AMQException
     {
         /*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/
         new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
         {
             public Object execute() throws AMQException, FailoverException
             {
-                    QueueBindBody body = getMethodRegistry().createQueueBindBody(getTicket(),queueName,exchangeName,routingKey,false,arguments);
-
-                    AMQFrame queueBind = body.generateFrame(_channelId);
-
-                    getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class);
-
+                sendQueueBind(queueName,routingKey,arguments,exchangeName,destination);
                 return null;
             }
         }, _connection).execute();
@@ -587,12 +582,12 @@
     {
         if( consumer.getQueuename() != null)
         {
-            bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName());
+            bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName(),amqd);
         }
     }
 
     public abstract void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
-            final AMQShortString exchangeName) throws AMQException, FailoverException;
+            final AMQShortString exchangeName,AMQDestination destination) throws AMQException, FailoverException;
 
     /**
 
@@ -1036,7 +1031,7 @@
             {
                 return new AMQQueue(new AMQBindingURL(queueName));
             }
-            catch (URLSyntaxException urlse)
+            catch (URISyntaxException urlse)
             {
                 JMSException jmse = new JMSException(urlse.getReason());
                 jmse.setLinkedException(urlse);
@@ -1253,7 +1248,7 @@
             {
                 return new AMQTopic(new AMQBindingURL(topicName));
             }
-            catch (URLSyntaxException urlse)
+            catch (URISyntaxException urlse)
             {
                 JMSException jmse = new JMSException(urlse.getReason());
                 jmse.setLinkedException(urlse);
@@ -1380,6 +1375,16 @@
         }
     }
 
+    public void declareAndBind(AMQDestination amqd)
+            throws
+            AMQException
+    {
+        AMQProtocolHandler protocolHandler = getProtocolHandler();
+        declareExchange(amqd, protocolHandler, false);
+        AMQShortString queueName = declareQueue(amqd, protocolHandler);
+        bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(),amqd);
+    }
+
     /**
      * Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message.
      *
@@ -1820,35 +1825,10 @@
      *
      * @todo Be aware of possible changes to parameter order as versions change.
      */
-    boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
-            throws JMSException
-    {
-        try
-        {
-            AMQMethodEvent response =
-                new FailoverRetrySupport<AMQMethodEvent, AMQException>(
-                    new FailoverProtectedOperation<AMQMethodEvent, AMQException>()
-                    {
-                        public AMQMethodEvent execute() throws AMQException, FailoverException
-                        {
-                            ExchangeBoundBody body = getMethodRegistry().createExchangeBoundBody(exchangeName, routingKey, queueName);
-                            AMQFrame boundFrame = body.generateFrame(_channelId);
-
-                                    return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
-
-                                }
-                            }, _connection).execute();
-
-            // Extract and return the response code from the query.
-            ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
-
-            return (responseBody.getReplyCode() == 0);
-        }
-        catch (AMQException e)
-        {
-            throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), e);
-        }
-    }
+    public abstract boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
+            throws JMSException;
+            
+    public abstract boolean isQueueBound(final AMQDestination destination) throws JMSException;
 
     /**
      * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover
@@ -2509,16 +2489,6 @@
         }
     }
 
-    public void declareAndBind(AMQDestination amqd)
-            throws
-            AMQException
-    {
-        AMQProtocolHandler protocolHandler = getProtocolHandler();
-        declareExchange(amqd, protocolHandler, false);
-        AMQShortString queueName = declareQueue(amqd, protocolHandler);
-        bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName());
-    }
-
     /**
      * Callers must hold the failover mutex before calling this method.
      *
@@ -2540,7 +2510,7 @@
         consumer.setQueuename(queueName);
 
         // bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
-        bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName());
+        bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName(),amqd);
 
         // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch
         if (!_immediatePrefetch)

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Thu Feb 28 08:11:52 2008
@@ -41,6 +41,7 @@
 
 import javax.jms.*;
 import javax.jms.IllegalStateException;
+
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.UUID;
 import java.util.Map;
@@ -159,7 +160,7 @@
         AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
         BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
         TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
-        
+
         _subscriptions.put(name, subscriber);
         _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
 
@@ -213,7 +214,7 @@
      * @param arguments    0_8 specific
      */
     public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey,
-                              final FieldTable arguments, final AMQShortString exchangeName)
+                              final FieldTable arguments, final AMQShortString exchangeName, final AMQDestination destination)
             throws AMQException, FailoverException
     {
         Map args = FiledTableSupport.convertToMap(arguments);
@@ -222,7 +223,12 @@
         {
             args.put("x-match", "any");
         }
-        getQpidSession().queueBind(queueName.toString(), exchangeName.toString(), routingKey.toString(), args);
+
+        for (AMQShortString rk: destination.getBindingKeys())
+        {
+            _logger.debug("Binding queue : " + queueName.toString() + " exchange: " + exchangeName.toString() + " using binding key " + rk.asString());
+            getQpidSession().queueBind(queueName.toString(), exchangeName.toString(), rk.toString(), args);
+        }
         // We need to sync so that we get notify of an error.
         getQpidSession().sync();
         getCurrentException();
@@ -238,6 +244,7 @@
      */
     public void sendClose(long timeout) throws AMQException, FailoverException
     {
+        getQpidSession().sync();
         getQpidSession().sessionClose();
         getCurrentException();
     }
@@ -350,19 +357,37 @@
     /**
      * Bind a queue with an exchange.
      */
-    public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName,
-                                final AMQShortString routingKey) throws JMSException
+
+    public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
+    throws JMSException
+    {
+        return isQueueBound(exchangeName,queueName,routingKey,null);
+    }
+
+    public boolean isQueueBound(final AMQDestination destination) throws JMSException
+    {
+        return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getRoutingKey(),destination.getBindingKeys());
+    }
+
+    public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey,AMQShortString[] bindingKeys)
+    throws JMSException
     {
         String rk = "";
         boolean res;
-        if (routingKey != null)
+        if (bindingKeys != null && bindingKeys.length>0)
+        {
+            rk = bindingKeys[0].toString();
+        }
+        else if (routingKey != null)
         {
             rk = routingKey.toString();
         }
+
         Future<BindingQueryResult> result =
-                getQpidSession().bindingQuery(exchangeName.toString(), queueName.toString(), rk, null);
+            getQpidSession().bindingQuery(exchangeName.toString(),queueName.toString(), rk, null);
         BindingQueryResult bindingQueryResult = result.get();
-        if (routingKey == null)
+
+        if (rk == null)
         {
             res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getQueueNotFound());
         }
@@ -577,7 +602,7 @@
         {
             // this is done so that we can produce to a temporary queue beofre we create a consumer
             sendCreateQueue(result.getRoutingKey(), result.isAutoDelete(), result.isDurable(), result.isExclusive());
-            sendQueueBind(result.getRoutingKey(), result.getRoutingKey(), new FieldTable(), result.getExchangeName());
+            sendQueueBind(result.getRoutingKey(), result.getRoutingKey(), new FieldTable(), result.getExchangeName(),result);
             result.setQueueName(result.getRoutingKey());
         }
         catch (Exception e)
@@ -701,7 +726,7 @@
             AMQShortString topicName;
             if (topic instanceof AMQTopic)
             {
-                topicName=((AMQTopic) topic).getRoutingKey();
+                topicName=((AMQTopic) topic).getBindingKeys()[0];
             }
             else
             {

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Thu Feb 28 08:11:52 2008
@@ -94,7 +94,7 @@
     }
 
     public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
-            final AMQShortString exchangeName) throws AMQException, FailoverException
+            final AMQShortString exchangeName, final AMQDestination dest) throws AMQException, FailoverException
     {
         getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody
                                         (getTicket(),queueName,exchangeName,routingKey,false,arguments).
@@ -169,6 +169,11 @@
 
             _connection.getProtocolHandler().writeFrame(basicRejectBody);
         }
+    }
+
+    public boolean isQueueBound(final AMQDestination destination) throws JMSException
+    {
+        return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getAMQQueueName());
     }
 
     public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java Thu Feb 28 08:11:52 2008
@@ -49,6 +49,11 @@
         super(exchange, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, routingKey, true, true, queueName, false);
     }
 
+    public AMQTopic(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName,AMQShortString[] bindingKeys)
+    {
+        super(exchange, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, routingKey, true, true, queueName, false,bindingKeys);
+    }
+
     public AMQTopic(AMQConnection conn, String routingKey)
     {
         this(conn.getDefaultTopicExchangeName(), new AMQShortString(routingKey));
@@ -77,6 +82,11 @@
         super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable );
     }
 
+    protected AMQTopic(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
+            boolean isAutoDelete, AMQShortString queueName, boolean isDurable,AMQShortString[] bindingKeys)
+    {
+        super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable,bindingKeys);
+    }
 
     public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection)
             throws JMSException

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Thu Feb 28 08:11:52 2008
@@ -51,7 +51,7 @@
     /** The connection being used by this consumer */
     protected final AMQConnection _connection;
 
-    private final String _messageSelector;
+    protected final String _messageSelector;
 
     private final boolean _noLocal;
 
@@ -740,6 +740,8 @@
             }
             else
             {
+                // we should not be allowed to add a message is the
+                // consumer is closed
                 _synchronousQueue.put(jmsMessage);
             }
         }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Thu Feb 28 08:11:52 2008
@@ -38,7 +38,6 @@
 import javax.jms.MessageListener;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.Iterator;
 
 /**
@@ -122,8 +121,10 @@
         }
         catch (AMQException e)
         {
+            _logger.error("Receivecd an Exception when receiving message",e);
             try
             {
+
                 getSession().getAMQConnection().getExceptionListener()
                         .onException(new JMSAMQException("Error when receiving message", e));
             }
@@ -135,6 +136,7 @@
         }
         if (messageOk)
         {
+            _logger.debug("messageOk, trying to notify");
             super.notifyMessage(jmsMessage);
         }
     }
@@ -290,7 +292,7 @@
         // TODO Use a tag for fiding out if message filtering is done here or by the broker.
         try
         {
-            if (getMessageSelector() != null && !getMessageSelector().equals(""))
+            if (_messageSelector != null && !_messageSelector.equals(""))
             {
                 messageOk = _filter.matches(message);
             }
@@ -332,6 +334,9 @@
                 _logger.debug("filterMessage - trying to acquire message");
             }
             messageOk = acquireMessage(message);
+            _logger.debug("filterMessage - *************************************");
+            _logger.debug("filterMessage - message acquire status : " + messageOk);
+            _logger.debug("filterMessage - *************************************");
         }
         return messageOk;
     }
@@ -393,13 +398,29 @@
 
             _0_10session.getQpidSession()
                     .messageAcquire(ranges, org.apache.qpidity.nclient.Session.MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE);
+
+            _logger.debug("acquireMessage, sent acquire message to broker");
+
             _0_10session.getQpidSession().sync();
+
+            _logger.debug("acquireMessage, returned from sync");
+
             RangeSet acquired = _0_10session.getQpidSession().getAccquiredMessages();
+
+            _logger.debug("acquireMessage, acquired range set " + acquired);
+
             if (acquired != null && acquired.size() > 0)
             {
                 result = true;
             }
+
+            _logger.debug("acquireMessage, Trying to get current exception ");
+
             _0_10session.getCurrentException();
+
+            _logger.debug("acquireMessage, returned from getting current exception ");
+
+            _logger.debug("acquireMessage, acquired range set " + acquired + " now returning " );
         }
         return result;
     }
@@ -473,4 +494,4 @@
             _session.rejectMessage(message, true);
         }
     }
-}
\ No newline at end of file
+}

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Thu Feb 28 08:11:52 2008
@@ -17,22 +17,22 @@
  */
 package org.apache.qpid.client;
 
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.FiledTableSupport;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.url.AMQBindingURL;
-import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpidity.njms.ExceptionHelper;
 import org.apache.qpidity.nclient.util.ByteBufferMessage;
-import org.apache.qpidity.transport.ReplyTo;
+import org.apache.qpidity.njms.ExceptionHelper;
 import org.apache.qpidity.transport.DeliveryProperties;
-
-import javax.jms.Message;
-import javax.jms.JMSException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
+import org.apache.qpidity.transport.ReplyTo;
 
 /**
  * This is a 0_10 message producer.
@@ -154,12 +154,20 @@
             String replyToURL = contentHeaderProperties.getReplyToAsString();
             if (replyToURL != null)
             {
+                if(_logger.isDebugEnabled())
+                {
+                    StringBuffer b = new StringBuffer();
+                    b.append("\n==========================");
+                    b.append("\nReplyTo : " + replyToURL);
+                    b.append("\n==========================");
+                    _logger.debug(b.toString());
+                }
                 AMQBindingURL dest;
                 try
                 {
                     dest = new AMQBindingURL(replyToURL);
                 }
-                catch (URLSyntaxException e)
+                catch (URISyntaxException e)
                 {
                     throw ExceptionHelper.convertQpidExceptionToJMSException(e);
                 }
@@ -198,8 +206,7 @@
 
     public boolean isBound(AMQDestination destination) throws JMSException
     {
-        return _session.isQueueBound(destination.getExchangeName(), destination.getAMQQueueName(),
-                                     destination.getRoutingKey());
+        return _session.isQueueBound(destination);
     }
 }
 

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Thu Feb 28 08:11:52 2008
@@ -20,30 +20,34 @@
  */
 package org.apache.qpid.client.message;
 
-import org.apache.commons.collections.map.ReferenceMap;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.Map;
+import java.util.UUID;
 
-import org.apache.mina.common.ByteBuffer;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageNotReadableException;
+import javax.jms.MessageNotWriteableException;
 
+import org.apache.commons.collections.map.ReferenceMap;
+import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.client.*;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.AMQUndefinedDestination;
+import org.apache.qpid.client.BasicMessageConsumer;
+import org.apache.qpid.client.CustomJMSXProperty;
+import org.apache.qpid.client.JMSAMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.url.BindingURL;
-import org.apache.qpid.url.URLSyntaxException;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageNotReadableException;
-import javax.jms.MessageNotWriteableException;
-
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.Map;
-import java.util.UUID;
-import java.io.IOException;
-import java.net.URISyntaxException;
 
 public abstract class AbstractJMSMessage extends AMQMessage implements org.apache.qpid.jms.Message
 {

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Thu Feb 28 08:11:52 2008
@@ -140,8 +140,7 @@
         props.setType(mprop.getType());
         props.setUserId(mprop.getUserId());
         props.setHeaders(FiledTableSupport.convertToFieldTable(mprop.getApplicationHeaders()));        
-        AbstractJMSMessage message = createMessage(messageNbr, data, exchange, routingKey, props);
-        message.receivedFromServer();
+        AbstractJMSMessage message = createMessage(messageNbr, data, exchange, routingKey, props);        
         return message;
     }
 
@@ -152,7 +151,7 @@
     {
         final AbstractJMSMessage msg = create08MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies);
         msg.setJMSRedelivered(redelivered);
-
+        msg.receivedFromServer();
         return msg;
     }
 
@@ -164,7 +163,7 @@
         final AbstractJMSMessage msg =
                 create010MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies, replyToURL);
         msg.setJMSRedelivered(redelivered);
-
+        msg.receivedFromServer();
         return msg;
     }
 

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java Thu Feb 28 08:11:52 2008
@@ -20,6 +20,24 @@
  */
 package org.apache.qpid.jndi;
 
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Queue;
+import javax.jms.Topic;
+import javax.naming.Context;
+import javax.naming.NamingException;
+import javax.naming.spi.InitialContextFactory;
+
 import org.apache.qpid.client.AMQConnectionFactory;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQHeadersExchange;
@@ -30,28 +48,9 @@
 import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.url.BindingURL;
 import org.apache.qpid.url.URLSyntaxException;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.Queue;
-import javax.jms.Topic;
-import javax.naming.Context;
-import javax.naming.NamingException;
-import javax.naming.spi.InitialContextFactory;
-
-import java.io.BufferedInputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.Hashtable;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-
 public class PropertiesFileInitialContextFactory implements InitialContextFactory
 {
     protected final Logger _logger = LoggerFactory.getLogger(PropertiesFileInitialContextFactory.class);
@@ -184,6 +183,17 @@
                 Topic t = createTopic(entry.getValue().toString());
                 if (t != null)
                 {
+                    if (_logger.isDebugEnabled())
+                    {
+                        StringBuffer b = new StringBuffer();
+                        b.append("Creating the topic: " + jndiName +  " with the following binding keys ");
+                        for (AMQShortString binding:((AMQTopic)t).getBindingKeys())
+                        {
+                            b.append(binding.toString()).append(",");
+                        }
+
+                        _logger.debug(b.toString());
+                    }
                     data.put(jndiName, t);
                 }
             }
@@ -219,7 +229,7 @@
         }
         catch (URISyntaxException urlse)
         {
-            _logger.warn("Unable to destination:" + urlse);
+            _logger.warn("Unable to create destination:" + urlse, urlse);
 
             return null;
         }
@@ -268,7 +278,17 @@
         }
         else if (value instanceof String)
         {
-            return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, new AMQShortString((String) value));
+            String[] keys = ((String)value).split(",");
+            AMQShortString[] bindings = new AMQShortString[keys.length];
+            int i = 0;
+            for (String key:keys)
+            {
+                bindings[i] = new AMQShortString(key);
+                i++;
+            }
+            // The Destination has a dual nature. If this was used for a producer the key is used
+            // for the routing key. If it was used for the consumer it becomes the bindingKey
+            return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME,bindings[0],null,bindings);
         }
         else if (value instanceof BindingURL)
         {

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java Thu Feb 28 08:11:52 2008
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,12 +23,18 @@
 import junit.framework.TestCase;
 
 import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.test.unit.basic.PropertyValueTest;
 import org.apache.qpid.url.AMQBindingURL;
-import org.apache.qpid.url.URLSyntaxException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URISyntaxException;
 
 public class DestinationURLTest extends TestCase
 {
-    public void testFullURL() throws URLSyntaxException
+    private static final Logger _logger = LoggerFactory.getLogger(DestinationURLTest.class);
+
+    public void testFullURL() throws URISyntaxException
     {
 
         String url = "exchange.Class://exchangeName/Destination/Queue";
@@ -43,7 +49,7 @@
         assertTrue(dest.getQueueName().equals("Queue"));
     }
 
-    public void testQueue() throws URLSyntaxException
+    public void testQueue() throws URISyntaxException
     {
 
         String url = "exchangeClass://exchangeName//Queue";
@@ -58,7 +64,7 @@
         assertTrue(dest.getQueueName().equals("Queue"));
     }
 
-    public void testQueueWithOption() throws URLSyntaxException
+    public void testQueueWithOption() throws URISyntaxException
     {
 
         String url = "exchangeClass://exchangeName//Queue?option='value'";
@@ -75,7 +81,7 @@
     }
 
 
-    public void testDestination() throws URLSyntaxException
+    public void testDestination() throws URISyntaxException
     {
 
         String url = "exchangeClass://exchangeName/Destination/";
@@ -90,7 +96,7 @@
         assertTrue(dest.getQueueName().equals(""));
     }
 
-    public void testDestinationWithOption() throws URLSyntaxException
+    public void testDestinationWithOption() throws URISyntaxException
     {
 
         String url = "exchangeClass://exchangeName/Destination/?option='value'";
@@ -107,7 +113,7 @@
         assertTrue(dest.getOption("option").equals("value"));
     }
 
-    public void testDestinationWithMultiOption() throws URLSyntaxException
+    public void testDestinationWithMultiOption() throws URISyntaxException
     {
 
         String url = "exchangeClass://exchangeName/Destination/?option='value',option2='value2'";
@@ -123,7 +129,7 @@
         assertTrue(dest.getOption("option2").equals("value2"));
     }
 
-    public void testDestinationWithNoExchangeDefaultsToDirect() throws URLSyntaxException
+    public void testDestinationWithNoExchangeDefaultsToDirect() throws URISyntaxException
     {
 
         String url = "IBMPerfQueue1?durable='true'";
@@ -136,6 +142,41 @@
         assertTrue(dest.getQueueName().equals("IBMPerfQueue1"));
 
         assertTrue(dest.getOption("durable").equals("true"));
+    }
+
+    public void testDestinationWithMultiBindingKeys() throws URISyntaxException
+    {
+
+        String url = "exchangeClass://exchangeName/Destination/?bindingkey='key1',bindingkey='key2'";
+
+        AMQBindingURL dest = new AMQBindingURL(url);
+
+        assertTrue(dest.getExchangeClass().equals("exchangeClass"));
+        assertTrue(dest.getExchangeName().equals("exchangeName"));
+        assertTrue(dest.getDestinationName().equals("Destination"));
+        assertTrue(dest.getQueueName().equals(""));
+
+        assertTrue(dest.getBindingKeys().length == 2);
+    }
+
+    // You can only specify only a routing key or binding key, but not both.
+    public void testDestinationIfOnlyRoutingKeyOrBindingKeyIsSpecified() throws URISyntaxException
+    {
+
+        String url = "exchangeClass://exchangeName/Destination/?bindingkey='key1',routingkey='key2'";
+        boolean exceptionThrown = false;
+        try
+        {
+
+            AMQBindingURL dest = new AMQBindingURL(url);
+        }
+        catch(URISyntaxException e)
+        {
+            exceptionThrown = true;
+            _logger.info("Exception thrown",e);
+        }
+
+        assertTrue("Failed to throw an URISyntaxException when both the bindingkey and routingkey is specified",exceptionThrown);
     }
 
     public static junit.framework.Test suite()

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java Thu Feb 28 08:11:52 2008
@@ -20,28 +20,29 @@
  */
 package org.apache.qpid.url;
 
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.HashMap;
-
 public class AMQBindingURL implements BindingURL
 {
     private static final Logger _logger = LoggerFactory.getLogger(AMQBindingURL.class);
 
     String _url;
-    AMQShortString _exchangeClass;
-    AMQShortString _exchangeName;
-    AMQShortString _destinationName;
-    AMQShortString _queueName;
+    AMQShortString _exchangeClass = ExchangeDefaults.DIRECT_EXCHANGE_CLASS;
+    AMQShortString _exchangeName = new AMQShortString("");
+    AMQShortString _destinationName = new AMQShortString("");;
+    AMQShortString _queueName = new AMQShortString("");
+    AMQShortString[] _bindingKeys = new AMQShortString[0];
     private HashMap<String, String> _options;
 
-    public AMQBindingURL(String url) throws URLSyntaxException
+    public AMQBindingURL(String url) throws URISyntaxException
     {
         // format:
         // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
@@ -52,116 +53,35 @@
         parseBindingURL();
     }
 
-    private void parseBindingURL() throws URLSyntaxException
+    private void parseBindingURL() throws URISyntaxException
     {
-        try
-        {
-            URI connection = new URI(_url);
-
-            String exchangeClass = connection.getScheme();
-
-            if (exchangeClass == null)
-            {
-                _url = ExchangeDefaults.DIRECT_EXCHANGE_CLASS + "://" + "" + "//" + _url;
-                // URLHelper.parseError(-1, "Exchange Class not specified.", _url);
-                parseBindingURL();
-
-                return;
-            }
-            else
-            {
-                setExchangeClass(exchangeClass);
-            }
-
-            String exchangeName = connection.getHost();
-
-            if (exchangeName == null)
-            {
-                if (getExchangeClass().equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
-                {
-                    setExchangeName("");
-                }
-                else
-                {
-                    throw URLHelper.parseError(-1, "Exchange Name not specified.", _url);
-                }
-            }
-            else
-            {
-                setExchangeName(exchangeName);
-            }
-
-            String queueName;
-
-            if ((connection.getPath() == null) || connection.getPath().equals(""))
-            {
-                throw URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(),
-                    "Destination or Queue requried", _url);
-            }
-            else
-            {
-                int slash = connection.getPath().indexOf("/", 1);
-                if (slash == -1)
-                {
-                    throw URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(),
-                        "Destination requried", _url);
-                }
-                else
-                {
-                    String path = connection.getPath();
-                    setDestinationName(path.substring(1, slash));
-
-                    // We don't set queueName yet as the actual value we use depends on options set
-                    // when we are dealing with durable subscriptions
-
-                    queueName = path.substring(slash + 1);
-
-                }
-            }
-
-            URLHelper.parseOptions(_options, connection.getQuery());
-
-            processOptions();
-
-            // We can now call setQueueName as the URL is full parsed.
-
-            setQueueName(queueName);
-
-            // Fragment is #string (not used)
-            _logger.debug("URL Parsed: " + this);
-
-        }
-        catch (URISyntaxException uris)
-        {
-
-            throw URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
-
-        }
+        BindingURLParser parser = new BindingURLParser(_url,this);
+        processOptions();
+        _logger.debug("URL Parsed: " + this);
     }
 
-    private void setExchangeClass(String exchangeClass)
+    public void setExchangeClass(String exchangeClass)
     {
         setExchangeClass(new AMQShortString(exchangeClass));
     }
 
-    private void setQueueName(String name) throws URLSyntaxException
+    public void setQueueName(String name)
     {
         setQueueName(new AMQShortString(name));
     }
 
-    private void setDestinationName(String name)
+    public void setDestinationName(String name)
     {
         setDestinationName(new AMQShortString(name));
     }
 
-    private void setExchangeName(String exchangeName)
+    public void setExchangeName(String exchangeName)
     {
         setExchangeName(new AMQShortString(exchangeName));
     }
 
-    private void processOptions()
+    private void processOptions() throws URISyntaxException
     {
-        // this is where we would parse any options that needed more than just storage.
     }
 
     public String getURL()
@@ -210,34 +130,9 @@
         return _queueName;
     }
 
-    public void setQueueName(AMQShortString name) throws URLSyntaxException
+    public void setQueueName(AMQShortString name)
     {
-        if (_exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
-        {
-            if (Boolean.parseBoolean(getOption(OPTION_DURABLE)))
-            {
-                if (containsOption(BindingURL.OPTION_CLIENTID) && containsOption(BindingURL.OPTION_SUBSCRIPTION))
-                {
-                    _queueName =
-                        new AMQShortString(getOption(BindingURL.OPTION_CLIENTID + ":" + BindingURL.OPTION_SUBSCRIPTION));
-                }
-                else
-                {
-                    throw URLHelper.parseError(-1, "Durable subscription must have values for " + BindingURL.OPTION_CLIENTID
-                        + " and " + BindingURL.OPTION_SUBSCRIPTION + ".", _url);
-
-                }
-            }
-            else
-            {
-                _queueName = null;
-            }
-        }
-        else
-        {
-            _queueName = name;
-        }
-
+        _queueName = name;
     }
 
     public String getOption(String key)
@@ -261,7 +156,7 @@
         {
             if (containsOption(BindingURL.OPTION_ROUTING_KEY))
             {
-                return new AMQShortString(getOption(OPTION_ROUTING_KEY));
+                return new AMQShortString((String)getOption(OPTION_ROUTING_KEY));
             }
             else
             {
@@ -271,12 +166,29 @@
 
         if (containsOption(BindingURL.OPTION_ROUTING_KEY))
         {
-            return new AMQShortString(getOption(OPTION_ROUTING_KEY));
+            return new AMQShortString((String)getOption(OPTION_ROUTING_KEY));
         }
 
         return getDestinationName();
     }
 
+    public AMQShortString[] getBindingKeys()
+    {
+        if (_bindingKeys != null && _bindingKeys.length>0)
+        {
+            return _bindingKeys;
+        }
+        else
+        {
+            return new AMQShortString[]{getRoutingKey()};
+        }
+    }
+
+    public void setBindingKeys(AMQShortString[] keys)
+    {
+        _bindingKeys = keys;
+    }
+
     public void setRoutingKey(AMQShortString key)
     {
         setOption(OPTION_ROUTING_KEY, key.toString());
@@ -296,6 +208,29 @@
 
         sb.append(URLHelper.printOptions(_options));
 
-        return sb.toString();
+        // temp hack
+        if (getRoutingKey() == null || getRoutingKey().toString().equals(""))
+        {
+
+            if (sb.toString().indexOf("?") == -1)
+            {
+                sb.append("?");
+            }
+            else
+            {
+                sb.append("&");
+            }
+
+            for (AMQShortString key :_bindingKeys)
+            {
+                sb.append(BindingURL.OPTION_BINDING_KEY).append("='").append(key.toString()).append("'&");
+            }
+
+            return sb.toString().substring(0,sb.toString().length()-1);
+        }
+        else
+        {
+            return sb.toString();
+        }
     }
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java Thu Feb 28 08:11:52 2008
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -34,6 +34,7 @@
     public static final String OPTION_CLIENTID = "clientid";
     public static final String OPTION_SUBSCRIPTION = "subscription";
     public static final String OPTION_ROUTING_KEY = "routingkey";
+    public static final String OPTION_BINDING_KEY = "bindingkey";
 
 
     String getURL();
@@ -51,6 +52,8 @@
     boolean containsOption(String key);
 
     AMQShortString getRoutingKey();
+
+    AMQShortString[] getBindingKeys();
 
     String toString();
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/module.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/module.xml?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/module.xml (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/module.xml Thu Feb 28 08:11:52 2008
@@ -158,6 +158,7 @@
   </target>
 
   <property name="test" value="*Test"/>
+  <property name="test.mem" value="512M"/>
 
   <property name="log" value="info"/>
   <property name="amqj.logging.level" value="${log}"/>
@@ -189,6 +190,8 @@
   <target name="test" depends="compile-tests" if="module.test.src.exists"
           description="execute unit tests">
     <junit fork="yes" haltonfailure="no" printsummary="on" timeout="600000">
+
+      <jvmarg value="-Xmx${test.mem}" />
 
       <sysproperty key="amqj.logging.level" value="${amqj.logging.level}"/>
       <sysproperty key="root.logging.level" value="${root.logging.level}"/>

Modified: incubator/qpid/branches/thegreatmerge/qpid/python/mllib/dom.py
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/mllib/dom.py?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/python/mllib/dom.py (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/python/mllib/dom.py Thu Feb 28 08:11:52 2008
@@ -72,7 +72,7 @@
       cls = cls.base
     return False
 
-  def dispatch(self, f):
+  def dispatch(self, f, attrs = ""):
     cls = self
     while cls != None:
       if hasattr(f, cls.type):
@@ -81,7 +81,6 @@
         cls = cls.base
 
     cls = self
-    attrs = ""
     while cls != None:
       if attrs:
         sep = ", "
@@ -151,9 +150,10 @@
 
   def dispatch(self, f):
     try:
-      method = getattr(f, "do_" + self.name)
+      attr = "do_" + self.name
+      method = getattr(f, attr)
     except AttributeError:
-      return Dispatcher.dispatch(self, f)
+      return Dispatcher.dispatch(self, f, "'%s'" % attr)
     return method(self)
 
 class Leaf(Component, Dispatcher):

Modified: incubator/qpid/branches/thegreatmerge/qpid/python/qpid/queue.py
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/qpid/queue.py?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/python/qpid/queue.py (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/python/qpid/queue.py Thu Feb 28 08:11:52 2008
@@ -24,36 +24,51 @@
 """
 
 from Queue import Queue as BaseQueue, Empty, Full
+from threading import Thread
 
 class Closed(Exception): pass
 
 class Queue(BaseQueue):
 
   END = object()
+  STOP = object()
 
   def __init__(self, *args, **kwargs):
     BaseQueue.__init__(self, *args, **kwargs)
-    self._real_put = self.put
-    self.listener = self._real_put
+    self.listener = None
+    self.thread = None
 
   def close(self):
     self.put(Queue.END)
 
   def get(self, block = True, timeout = None):
-    self.put = self._real_put
-    try:
-      result = BaseQueue.get(self, block, timeout)
-      if result == Queue.END:
-        # this guarantees that any other waiting threads or any future
-        # calls to get will also result in a Closed exception
-        self.put(Queue.END)
-        raise Closed()
-      else:
-        return result
-    finally:
-      self.put = self.listener
-      pass
+    result = BaseQueue.get(self, block, timeout)
+    if result == Queue.END:
+      # this guarantees that any other waiting threads or any future
+      # calls to get will also result in a Closed exception
+      self.put(Queue.END)
+      raise Closed()
+    else:
+      return result
 
   def listen(self, listener):
     self.listener = listener
-    self.put = self.listener
+    if listener == None:
+      if self.thread != None:
+        self.put(Queue.STOP)
+        self.thread.join()
+        self.thread = None
+    else:
+      if self.thread == None:
+        self.thread = Thread(target = self.run)
+        self.thread.setDaemon(True)
+        self.thread.start()
+
+  def run(self):
+    while True:
+      try:
+        o = self.get()
+        if o == Queue.STOP: break
+        self.listener(o)
+      except Closed:
+        break

Modified: incubator/qpid/branches/thegreatmerge/qpid/python/tests/queue.py
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests/queue.py?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/python/tests/queue.py (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/python/tests/queue.py Thu Feb 28 08:11:52 2008
@@ -30,37 +30,32 @@
   # all the queue functionality.
 
   def test_listen(self):
-    LISTEN = object()
-    GET = object()
-    EMPTY = object()
+    values = []
+    heard = threading.Event()
+    def listener(x):
+      values.append(x)
+      heard.set()
 
     q = Queue(0)
-    values = []
-    q.listen(lambda x: values.append((LISTEN, x)))
+    q.listen(listener)
+    heard.clear()
     q.put(1)
-    assert values[-1] == (LISTEN, 1)
+    heard.wait()
+    assert values[-1] == 1
+    heard.clear()
     q.put(2)
-    assert values[-1] == (LISTEN, 2)
-
-    class Getter(threading.Thread):
+    heard.wait()
+    assert values[-1] == 2
 
-      def run(self):
-        try:
-          values.append((GET, q.get(timeout=10)))
-        except Empty:
-          values.append(EMPTY)
-
-    g = Getter()
-    g.start()
-    # let the other thread reach the get
-    time.sleep(2)
+    q.listen(None)
     q.put(3)
-    g.join()
-
-    assert values[-1] == (GET, 3)
+    assert q.get(3) == 3
+    q.listen(listener)
 
+    heard.clear()
     q.put(4)
-    assert values[-1] == (LISTEN, 4)
+    heard.wait()
+    assert values[-1] == 4
 
   def test_close(self):
     q = Queue(0)