You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2008/02/27 06:15:21 UTC

svn commit: r631486 - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid: client/ client/message/ jndi/

Author: rajith
Date: Tue Feb 26 21:15:20 2008
New Revision: 631486

URL: http://svn.apache.org/viewvc?rev=631486&view=rev
Log:
This contains the ground work for QPID-803.

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=631486&r1=631485&r2=631486&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java Tue Feb 26 21:15:20 2008
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.client;
 
+import java.net.URISyntaxException;
+
 import javax.jms.Destination;
 import javax.naming.NamingException;
 import javax.naming.Reference;
@@ -31,7 +33,6 @@
 import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.url.BindingURL;
 import org.apache.qpid.url.URLHelper;
-import org.apache.qpid.url.URLSyntaxException;
 
 
 public abstract class AMQDestination implements Destination, Referenceable
@@ -50,6 +51,8 @@
 
     private AMQShortString _routingKey;
 
+    private AMQShortString[] _bindingKeys;
+
     private String _url;
     private AMQShortString _urlAsShortString;
 
@@ -64,7 +67,7 @@
     public static final Integer TOPIC_TYPE = Integer.valueOf(2);
     public static final Integer UNKNOWN_TYPE = Integer.valueOf(3);
 
-    protected AMQDestination(String url) throws URLSyntaxException
+    protected AMQDestination(String url) throws URISyntaxException
     {
         this(new AMQBindingURL(url));
     }
@@ -79,26 +82,43 @@
         _isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE));
         _queueName = binding.getQueueName() == null ? null : new AMQShortString(binding.getQueueName());
         _routingKey = binding.getRoutingKey() == null ? null : new AMQShortString(binding.getRoutingKey());
+        _bindingKeys = binding.getBindingKeys() == null || binding.getBindingKeys().length == 0 ? new AMQShortString[0] : binding.getBindingKeys();
     }
 
     protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, AMQShortString queueName)
     {
-        this(exchangeName, exchangeClass, routingKey, false, false, queueName);
+        this(exchangeName, exchangeClass, routingKey, false, false, queueName, null);
+    }
+
+    protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, AMQShortString queueName, AMQShortString[] bindingKeys)
+    {
+        this(exchangeName, exchangeClass, routingKey, false, false, queueName,bindingKeys);
     }
 
     protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName)
     {
-        this(exchangeName, exchangeClass, destinationName, false, false, null);
+        this(exchangeName, exchangeClass, destinationName, false, false, null,null);
     }
 
     protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
-                             boolean isAutoDelete, AMQShortString queueName)
+            boolean isAutoDelete, AMQShortString queueName)
     {
-        this(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, false);
+        this(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, false,null);
     }
 
     protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
-                             boolean isAutoDelete, AMQShortString queueName, boolean isDurable)
+                             boolean isAutoDelete, AMQShortString queueName,AMQShortString[] bindingKeys)
+    {
+        this(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, false,bindingKeys);
+    }
+
+    protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
+            boolean isAutoDelete, AMQShortString queueName, boolean isDurable){
+        this (exchangeName, exchangeClass, routingKey, isExclusive,isAutoDelete,queueName,isDurable,null);
+    }
+
+    protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
+                             boolean isAutoDelete, AMQShortString queueName, boolean isDurable,AMQShortString[] bindingKeys)
     {
         // If used with a fannout exchange, the routing key can be null
         if ( !ExchangeDefaults.FANOUT_EXCHANGE_CLASS.equals(exchangeClass) && routingKey == null)
@@ -120,6 +140,7 @@
         _isAutoDelete = isAutoDelete;
         _queueName = queueName;
         _isDurable = isDurable;
+        _bindingKeys = bindingKeys == null || bindingKeys.length == 0 ? new AMQShortString[0] : bindingKeys;
     }
 
     public AMQShortString getEncodedName()
@@ -181,6 +202,20 @@
         return _routingKey;
     }
 
+    public AMQShortString[] getBindingKeys()
+    {
+        if (_bindingKeys != null && _bindingKeys.length > 0)
+        {
+            return _bindingKeys;
+        }
+        else
+        {
+            // catering to the common use case where the
+            //routingKey is the same as the bindingKey.
+            return new AMQShortString[]{_routingKey};
+        }
+    }
+
     public boolean isExclusive()
     {
         return _isExclusive;
@@ -236,6 +271,21 @@
                 sb.append(BindingURL.OPTION_ROUTING_KEY);
                 sb.append("='");
                 sb.append(_routingKey).append("'");
+                sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+            }
+
+            if (_bindingKeys != null && _bindingKeys.length>0)
+            {
+                sb.append(BindingURL.OPTION_BINDING_KEY);
+                sb.append("='");
+                for (AMQShortString bindingKey:_bindingKeys)
+                {
+
+                    sb.append(bindingKey).append(",");
+
+                }
+                sb.deleteCharAt(sb.length() - 1);
+                sb.append("'");
                 sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
             }
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java?rev=631486&r1=631485&r2=631486&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java Tue Feb 26 21:15:20 2008
@@ -61,8 +61,14 @@
     public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName)
     {
         super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, false,
-              false, queueName, false);    }
+              false, queueName, false);
+    }
 
+    public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName,AMQShortString[] bindingKeys)
+    {
+        super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, false,
+              false, queueName, false,bindingKeys);
+    }
 
     /**
      * Create a reference to a non temporary queue. Note this does not actually imply the queue exists.
@@ -126,11 +132,15 @@
         this(exchangeName, routingKey, queueName, exclusive, autoDelete, false);
     }
 
-
     public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable)
     {
+        this(exchangeName,routingKey,queueName,exclusive,autoDelete,durable,null);
+    }
+
+    public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable,AMQShortString[] bindingKeys)
+    {
         super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, exclusive,
-              autoDelete, queueName, durable);
+              autoDelete, queueName, durable, bindingKeys);
     }
 
     public AMQShortString getRoutingKey()

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=631486&r1=631485&r2=631486&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Feb 26 21:15:20 2008
@@ -22,6 +22,7 @@
 
 
 import java.io.Serializable;
+import java.net.URISyntaxException;
 import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -29,6 +30,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.BytesMessage;
 import javax.jms.Destination;
@@ -80,12 +82,9 @@
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.url.AMQBindingURL;
-import org.apache.qpid.url.URLSyntaxException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  *
  * <p/><table id="crc"><caption>CRC Card</caption>
@@ -407,14 +406,14 @@
      * @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges?
      */
     public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
-                          final AMQShortString exchangeName) throws AMQException
+                          final AMQShortString exchangeName,final AMQDestination destination) throws AMQException
     {
         /*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/
         new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
         {
             public Object execute() throws AMQException, FailoverException
             {
-                sendQueueBind(queueName,routingKey,arguments,exchangeName);
+                sendQueueBind(queueName,routingKey,arguments,exchangeName,destination);
                 return null;
             }
         }, _connection).execute();
@@ -425,12 +424,12 @@
     {
         if( consumer.getQueuename() != null)
         {
-            bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName());
+            bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName(),amqd);
         }
     }
 
     public abstract void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
-            final AMQShortString exchangeName) throws AMQException, FailoverException;
+            final AMQShortString exchangeName,AMQDestination destination) throws AMQException, FailoverException;
 
     /**
 
@@ -820,7 +819,7 @@
             {
                 return new AMQQueue(new AMQBindingURL(queueName));
             }
-            catch (URLSyntaxException urlse)
+            catch (URISyntaxException urlse)
             {
                 JMSException jmse = new JMSException(urlse.getReason());
                 jmse.setLinkedException(urlse);
@@ -1031,7 +1030,7 @@
             {
                 return new AMQTopic(new AMQBindingURL(topicName));
             }
-            catch (URLSyntaxException urlse)
+            catch (URISyntaxException urlse)
             {
                 JMSException jmse = new JMSException(urlse.getReason());
                 jmse.setLinkedException(urlse);
@@ -1165,7 +1164,7 @@
         AMQProtocolHandler protocolHandler = getProtocolHandler();
         declareExchange(amqd, protocolHandler, false);
         AMQShortString queueName = declareQueue(amqd, protocolHandler);
-        bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName());
+        bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(),amqd);
     }
 
     /**
@@ -1556,6 +1555,9 @@
     public abstract boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
             throws JMSException;
 
+
+    public abstract boolean isQueueBound(final AMQDestination destination) throws JMSException;
+
     /**
      * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover
      * when the client has veoted resubscription. <p/> The caller of this method must already hold the failover mutex.
@@ -2111,7 +2113,7 @@
         consumer.setQueuename(queueName);
 
         // bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
-        bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName());
+        bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName(),amqd);
 
         // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch
         if (!_immediatePrefetch)

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=631486&r1=631485&r2=631486&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Tue Feb 26 21:15:20 2008
@@ -41,6 +41,7 @@
 
 import javax.jms.*;
 import javax.jms.IllegalStateException;
+
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.UUID;
 import java.util.Map;
@@ -159,7 +160,7 @@
         AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
         BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
         TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
-        
+
         _subscriptions.put(name, subscriber);
         _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
 
@@ -213,7 +214,7 @@
      * @param arguments    0_8 specific
      */
     public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey,
-                              final FieldTable arguments, final AMQShortString exchangeName)
+                              final FieldTable arguments, final AMQShortString exchangeName, final AMQDestination destination)
             throws AMQException, FailoverException
     {
         Map args = FiledTableSupport.convertToMap(arguments);
@@ -222,7 +223,12 @@
         {
             args.put("x-match", "any");
         }
-        getQpidSession().queueBind(queueName.toString(), exchangeName.toString(), routingKey.toString(), args);
+
+        for (AMQShortString rk: destination.getBindingKeys())
+        {
+            _logger.debug("Binding queue : " + queueName.toString() + " exchange: " + exchangeName.toString() + " using binding key " + rk.asString());
+            getQpidSession().queueBind(queueName.toString(), exchangeName.toString(), rk.toString(), args);
+        }
         // We need to sync so that we get notify of an error.
         getQpidSession().sync();
         getCurrentException();
@@ -238,6 +244,7 @@
      */
     public void sendClose(long timeout) throws AMQException, FailoverException
     {
+        getQpidSession().sync();
         getQpidSession().sessionClose();
         getCurrentException();
     }
@@ -350,19 +357,37 @@
     /**
      * Bind a queue with an exchange.
      */
-    public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName,
-                                final AMQShortString routingKey) throws JMSException
+
+    public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
+    throws JMSException
+    {
+        return isQueueBound(exchangeName,queueName,routingKey,null);
+    }
+
+    public boolean isQueueBound(final AMQDestination destination) throws JMSException
+    {
+        return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getRoutingKey(),destination.getBindingKeys());
+    }
+
+    public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey,AMQShortString[] bindingKeys)
+    throws JMSException
     {
         String rk = "";
         boolean res;
-        if (routingKey != null)
+        if (bindingKeys != null && bindingKeys.length>0)
+        {
+            rk = bindingKeys[0].toString();
+        }
+        else if (routingKey != null)
         {
             rk = routingKey.toString();
         }
+
         Future<BindingQueryResult> result =
-                getQpidSession().bindingQuery(exchangeName.toString(), queueName.toString(), rk, null);
+            getQpidSession().bindingQuery(exchangeName.toString(),queueName.toString(), rk, null);
         BindingQueryResult bindingQueryResult = result.get();
-        if (routingKey == null)
+
+        if (rk == null)
         {
             res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getQueueNotFound());
         }
@@ -577,7 +602,7 @@
         {
             // this is done so that we can produce to a temporary queue beofre we create a consumer
             sendCreateQueue(result.getRoutingKey(), result.isAutoDelete(), result.isDurable(), result.isExclusive());
-            sendQueueBind(result.getRoutingKey(), result.getRoutingKey(), new FieldTable(), result.getExchangeName());
+            sendQueueBind(result.getRoutingKey(), result.getRoutingKey(), new FieldTable(), result.getExchangeName(),result);
             result.setQueueName(result.getRoutingKey());
         }
         catch (Exception e)
@@ -701,7 +726,7 @@
             AMQShortString topicName;
             if (topic instanceof AMQTopic)
             {
-                topicName=((AMQTopic) topic).getRoutingKey();
+                topicName=((AMQTopic) topic).getBindingKeys()[0];
             }
             else
             {

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=631486&r1=631485&r2=631486&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Tue Feb 26 21:15:20 2008
@@ -94,7 +94,7 @@
     }
 
     public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
-            final AMQShortString exchangeName) throws AMQException, FailoverException
+            final AMQShortString exchangeName, final AMQDestination dest) throws AMQException, FailoverException
     {
         AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), arguments, // arguments
                 exchangeName, // exchange
@@ -176,6 +176,11 @@
 
             _connection.getProtocolHandler().writeFrame(basicRejectBody);
         }
+    }
+
+    public boolean isQueueBound(final AMQDestination destination) throws JMSException
+    {
+        return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getAMQQueueName());
     }
 
     public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java?rev=631486&r1=631485&r2=631486&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java Tue Feb 26 21:15:20 2008
@@ -49,6 +49,11 @@
         super(exchange, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, routingKey, true, true, queueName, false);
     }
 
+    public AMQTopic(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName,AMQShortString[] bindingKeys)
+    {
+        super(exchange, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, routingKey, true, true, queueName, false,bindingKeys);
+    }
+
     public AMQTopic(AMQConnection conn, String routingKey)
     {
         this(conn.getDefaultTopicExchangeName(), new AMQShortString(routingKey));
@@ -77,6 +82,11 @@
         super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable );
     }
 
+    protected AMQTopic(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
+            boolean isAutoDelete, AMQShortString queueName, boolean isDurable,AMQShortString[] bindingKeys)
+    {
+        super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable,bindingKeys);
+    }
 
     public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection)
             throws JMSException

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=631486&r1=631485&r2=631486&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Tue Feb 26 21:15:20 2008
@@ -121,8 +121,10 @@
         }
         catch (AMQException e)
         {
+            _logger.error("Receivecd an Exception when receiving message",e);
             try
             {
+
                 getSession().getAMQConnection().getExceptionListener()
                         .onException(new JMSAMQException("Error when receiving message", e));
             }
@@ -134,6 +136,7 @@
         }
         if (messageOk)
         {
+            _logger.debug("messageOk, trying to notify");
             super.notifyMessage(jmsMessage, channelId);
         }
     }
@@ -331,6 +334,9 @@
                 _logger.debug("filterMessage - trying to acquire message");
             }
             messageOk = acquireMessage(message);
+            _logger.debug("filterMessage - *************************************");
+            _logger.debug("filterMessage - message acquire status : " + messageOk);
+            _logger.debug("filterMessage - *************************************");
         }
         return messageOk;
     }
@@ -392,13 +398,29 @@
 
             _0_10session.getQpidSession()
                     .messageAcquire(ranges, org.apache.qpidity.nclient.Session.MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE);
+
+            _logger.debug("acquireMessage, sent acquire message to broker");
+
             _0_10session.getQpidSession().sync();
+
+            _logger.debug("acquireMessage, returned from sync");
+
             RangeSet acquired = _0_10session.getQpidSession().getAccquiredMessages();
+
+            _logger.debug("acquireMessage, acquired range set " + acquired);
+
             if (acquired != null && acquired.size() > 0)
             {
                 result = true;
             }
+
+            _logger.debug("acquireMessage, Trying to get current exception ");
+
             _0_10session.getCurrentException();
+
+            _logger.debug("acquireMessage, returned from getting current exception ");
+
+            _logger.debug("acquireMessage, acquired range set " + acquired + " now returning " );
         }
         return result;
     }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=631486&r1=631485&r2=631486&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Tue Feb 26 21:15:20 2008
@@ -17,22 +17,22 @@
  */
 package org.apache.qpid.client;
 
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.FiledTableSupport;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.url.AMQBindingURL;
-import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpidity.njms.ExceptionHelper;
 import org.apache.qpidity.nclient.util.ByteBufferMessage;
-import org.apache.qpidity.transport.ReplyTo;
+import org.apache.qpidity.njms.ExceptionHelper;
 import org.apache.qpidity.transport.DeliveryProperties;
-
-import javax.jms.Message;
-import javax.jms.JMSException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
+import org.apache.qpidity.transport.ReplyTo;
 
 /**
  * This is a 0_10 message producer.
@@ -154,12 +154,20 @@
             String replyToURL = contentHeaderProperties.getReplyToAsString();
             if (replyToURL != null)
             {
+                if(_logger.isDebugEnabled())
+                {
+                    StringBuffer b = new StringBuffer();
+                    b.append("\n==========================");
+                    b.append("\nReplyTo : " + replyToURL);
+                    b.append("\n==========================");
+                    _logger.debug(b.toString());
+                }
                 AMQBindingURL dest;
                 try
                 {
                     dest = new AMQBindingURL(replyToURL);
                 }
-                catch (URLSyntaxException e)
+                catch (URISyntaxException e)
                 {
                     throw ExceptionHelper.convertQpidExceptionToJMSException(e);
                 }
@@ -198,8 +206,7 @@
 
     public boolean isBound(AMQDestination destination) throws JMSException
     {
-        return _session.isQueueBound(destination.getExchangeName(), destination.getAMQQueueName(),
-                                     destination.getRoutingKey());
+        return _session.isQueueBound(destination);
     }
 }
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?rev=631486&r1=631485&r2=631486&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Tue Feb 26 21:15:20 2008
@@ -20,29 +20,34 @@
  */
 package org.apache.qpid.client.message;
 
-import org.apache.commons.collections.map.ReferenceMap;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.Map;
+import java.util.UUID;
 
-import org.apache.mina.common.ByteBuffer;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageNotReadableException;
+import javax.jms.MessageNotWriteableException;
 
+import org.apache.commons.collections.map.ReferenceMap;
+import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.client.*;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.AMQUndefinedDestination;
+import org.apache.qpid.client.BasicMessageConsumer;
+import org.apache.qpid.client.CustomJMSXProperty;
+import org.apache.qpid.client.JMSAMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.url.BindingURL;
-import org.apache.qpid.url.URLSyntaxException;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageNotReadableException;
-import javax.jms.MessageNotWriteableException;
-
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.Map;
-import java.util.UUID;
-import java.io.IOException;
 
 public abstract class AbstractJMSMessage extends AMQMessage implements org.apache.qpid.jms.Message
 {
@@ -222,7 +227,7 @@
                     BindingURL binding = new AMQBindingURL(replyToEncoding);
                     dest = AMQDestination.createDestination(binding);
                 }
-                catch (URLSyntaxException e)
+                catch (URISyntaxException e)
                 {
                     throw new JMSAMQException("Illegal value in JMS_ReplyTo property: " + replyToEncoding, e);
                 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java?rev=631486&r1=631485&r2=631486&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java Tue Feb 26 21:15:20 2008
@@ -20,6 +20,24 @@
  */
 package org.apache.qpid.jndi;
 
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Queue;
+import javax.jms.Topic;
+import javax.naming.Context;
+import javax.naming.NamingException;
+import javax.naming.spi.InitialContextFactory;
+
 import org.apache.qpid.client.AMQConnectionFactory;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQHeadersExchange;
@@ -30,27 +48,9 @@
 import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.url.BindingURL;
 import org.apache.qpid.url.URLSyntaxException;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.Queue;
-import javax.jms.Topic;
-import javax.naming.Context;
-import javax.naming.NamingException;
-import javax.naming.spi.InitialContextFactory;
-
-import java.io.BufferedInputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.Hashtable;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-
 public class PropertiesFileInitialContextFactory implements InitialContextFactory
 {
     protected final Logger _logger = LoggerFactory.getLogger(PropertiesFileInitialContextFactory.class);
@@ -99,7 +99,7 @@
             _logger.warn("Unable to load property file specified in Provider_URL:" + environment.get(Context.PROVIDER_URL));
         }
 
-        
+
 
         createConnectionFactories(data, environment);
 
@@ -185,6 +185,17 @@
                 Topic t = createTopic(entry.getValue().toString());
                 if (t != null)
                 {
+                    if (_logger.isDebugEnabled())
+                    {
+                        StringBuffer b = new StringBuffer();
+                        b.append("Creating the topic: " + jndiName +  " with the following binding keys ");
+                        for (AMQShortString binding:((AMQTopic)t).getBindingKeys())
+                        {
+                            b.append(binding.toString()).append(",");
+                        }
+
+                        _logger.debug(b.toString());
+                    }
                     data.put(jndiName, t);
                 }
             }
@@ -218,9 +229,9 @@
         {
             binding = new AMQBindingURL(bindingURL);
         }
-        catch (URLSyntaxException urlse)
+        catch (URISyntaxException urlse)
         {
-            _logger.warn("Unable to destination:" + urlse);
+            _logger.warn("Unable to create destination:" + urlse, urlse);
 
             return null;
         }
@@ -269,7 +280,17 @@
         }
         else if (value instanceof String)
         {
-            return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, new AMQShortString((String) value));
+            String[] keys = ((String)value).split(",");
+            AMQShortString[] bindings = new AMQShortString[keys.length];
+            int i = 0;
+            for (String key:keys)
+            {
+                bindings[i] = new AMQShortString(key);
+                i++;
+            }
+            // The Destination has a dual nature. If this was used for a producer the key is used
+            // for the routing key. If it was used for the consumer it becomes the bindingKey
+            return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME,bindings[0],null,bindings);
         }
         else if (value instanceof BindingURL)
         {