You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2010/05/28 05:18:53 UTC

svn commit: r949083 - in /qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client: AMQDestination.java AMQSession_0_10.java BasicMessageConsumer_0_10.java messaging/address/AddressHelper.java messaging/address/Link.java

Author: rajith
Date: Fri May 28 03:18:52 2010
New Revision: 949083

URL: http://svn.apache.org/viewvc?rev=949083&view=rev
Log:
1. Capacity can now be specified as " capacity : {source: 5, target 10}" in addition to "capacity:10" where both source and target is set to 10.
2. If the exchange type if direct and no subject is set, then the routing_key is set to "" instead of throwing an exception.
3. Added a fix to infer the exchange type if specified in the x-declares section.
4. The link can now specify an optional 'name' parameter which will be used as the queue name if present.

Modified:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=949083&r1=949082&r2=949083&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java Fri May 28 03:18:52 2010
@@ -139,7 +139,7 @@ public abstract class AMQDestination imp
     protected Node _targetNode;
     protected Node _sourceNode;
     protected Link _targetLink;
-    protected Link _sourceLink;    
+    protected Link _link;    
     // ----- / Fields required to support new address syntax -------
     
     static
@@ -739,14 +739,14 @@ public abstract class AMQDestination imp
         _sourceNode = node;
     }
 
-    public Link getSourceLink()
+    public Link getLink()
     {
-        return _sourceLink;
+        return _link;
     }
 
-    public void setSourceLink(Link link)
+    public void setLink(Link link)
     {
-        _sourceLink = link;
+        _link = link;
     }
     
     public void setExchangeName(AMQShortString name)
@@ -792,7 +792,7 @@ public abstract class AMQDestination imp
         _addressType = _addrHelper.getTargetNodeType();         
         _targetNode =  _addrHelper.getTargetNode(_addressType);
         _sourceNode = _addrHelper.getSourceNode(_addressType);
-        _sourceLink = _addrHelper.getLink();       
+        _link = _addrHelper.getLink();       
     }
     
     // This method is needed if we didn't know the node type at the beginning.

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=949083&r1=949082&r2=949083&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Fri May 28 03:18:52 2010
@@ -612,9 +612,9 @@ public class AMQSession_0_10 extends AMQ
     {
         long capacity = 0;
         if (destination.getDestSyntax() == DestSyntax.ADDR && 
-                destination.getSourceLink().getCapacity() > 0)
+                destination.getLink().getConsumerCapacity() > 0)
         {
-            capacity = destination.getSourceLink().getCapacity();
+            capacity = destination.getLink().getConsumerCapacity();
         }
         else if (prefetch())
         {
@@ -1229,10 +1229,10 @@ public class AMQSession_0_10 extends AMQ
                 dest.setRoutingKey(ExchangeDefaults.WILDCARD_ANY);
                 dest.setSubject(ExchangeDefaults.WILDCARD_ANY.toString());
             }
-            else if (dest.getExchangeClass() == ExchangeDefaults.DIRECT_EXCHANGE_CLASS)
+            else
             {
-               throw new AMQException("If sending to an exchange of type direct," +
-               		" a valid subject must be specified");
+                dest.setRoutingKey(new AMQShortString(""));
+                dest.setSubject("");
             }
         }
     }
@@ -1242,9 +1242,10 @@ public class AMQSession_0_10 extends AMQ
         QueueNode node = (QueueNode)dest.getSourceNode();  // source node is never null
         if (dest.getQueueName() == null || !isQueueExist(dest,node,true))
         {
-            // can name : my-queue be used in x-declare?
-            // if so set it to dest queue name
-            // if (dest.getQueueName() == null) { dest.setName(node.getName()) }
+            if (dest.getLink() != null && dest.getLink().getName() != null) 
+            {
+                dest.setQueueName(new AMQShortString(dest.getLink().getName())); 
+            }
             send0_10QueueDeclare(dest,null,false,false);
         }
         node.addBinding(new Binding(dest.getAddressName(),

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=949083&r1=949082&r2=949083&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Fri May 28 03:18:52 2010
@@ -106,9 +106,9 @@ public class BasicMessageConsumer_0_10 e
         
         // Destination setting overrides connection defaults
         if (destination.getDestSyntax() == DestSyntax.ADDR && 
-                destination.getSourceLink().getCapacity() > 0)
+                destination.getLink().getConsumerCapacity() > 0)
         {
-            capacity = destination.getSourceLink().getCapacity();
+            capacity = destination.getLink().getConsumerCapacity();
         }
         else if (getSession().prefetch())
         {

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java?rev=949083&r1=949082&r2=949083&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java Fri May 28 03:18:52 2010
@@ -58,6 +58,8 @@ public class AddressHelper
     public static final String BINDINGS = "bindings";
     public static final String BROWSE_ONLY = "browse";
     public static final String CAPACITY = "capacity";
+    public static final String CAPACITY_SOURCE = "source";
+    public static final String CAPACITY_TARGET = "target";
     public static final String NAME = "name";
     public static final String EXCHANGE = "exchange";
     public static final String QUEUE = "queue";
@@ -220,11 +222,7 @@ public class AddressHelper
         {
             return AMQDestination.QUEUE_TYPE;
         }
-        else if ((nodeProps.getString(TYPE).equals("topic") ||
-                 nodeProps.getString(TYPE).equals("direct") ||
-                 nodeProps.getString(TYPE).equals("fanout") ||
-                 nodeProps.getString(TYPE).equals("match") ||  
-                 nodeProps.getString(TYPE).equals("xml")) )
+        else if (nodeProps.getString(TYPE).equals("topic"))
         {
             return AMQDestination.TOPIC_TYPE;
         }
@@ -258,7 +256,8 @@ public class AddressHelper
         Map declareArgs = getDeclareArgs(parent);
         MapAccessor argsMap = new MapAccessor(declareArgs);
         ExchangeNode node = new ExchangeNode();
-        node.setExchangeType(nodeProps.getString(TYPE));
+        node.setExchangeType(argsMap.getString(TYPE) == null?
+                             "topic":argsMap.getString(TYPE));
         node.setDeclareArgs(getQpidExchangeOptions(argsMap));
         fillInCommonNodeArgs(node,parent,argsMap);
         return node;
@@ -285,6 +284,11 @@ public class AddressHelper
         node.setBindings(getBindings(parent));
     }
     
+    /**
+     * if the type == queue x-declare args from the node props is used.
+     * if the type == exchange x-declare args from the link props is used
+     * else just create a default temp queue.
+     */
     public Node getSourceNode(int addressType)
     {
         if (addressType == AMQDestination.QUEUE_TYPE && nodeProps != null)
@@ -309,7 +313,19 @@ public class AddressHelper
         {            
             link.setDurable(linkProps.getBoolean(DURABLE)== null? false : linkProps.getBoolean(DURABLE));
             link.setName(linkProps.getString(NAME));
-            link.setCapacity(linkProps.getInt(CAPACITY));
+            
+            if (((Map)address.getOptions().get(LINK)).get(CAPACITY) instanceof Map)
+            { 
+                MapAccessor capacityProps = new MapAccessor(
+                        (Map)((Map)address.getOptions().get(LINK)).get(CAPACITY));
+                link.setConsumerCapacity(capacityProps.getInt(CAPACITY_SOURCE));
+                link.setProducerCapacity(capacityProps.getInt(CAPACITY_TARGET));
+            }
+            else
+            {
+                link.setConsumerCapacity(linkProps.getInt(CAPACITY));  
+                link.setProducerCapacity(linkProps.getInt(CAPACITY));
+            }
             link.setFilter(linkProps.getString(FILTER));
             // so far filter type not used
         }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java?rev=949083&r1=949082&r2=949083&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java Fri May 28 03:18:52 2010
@@ -31,7 +31,8 @@ public class Link
     protected FilterType _filterType = FilterType.SUBJECT;
     protected boolean _isNoLocal;
     protected boolean _isDurable;
-    protected int _capacity = 0;
+    protected int _consumerCapacity = 0;
+    protected int _producerCapacity = 0;
     protected Node node;
     
     public Node getNode()
@@ -84,16 +85,26 @@ public class Link
         _isNoLocal = noLocal;
     }
 
-    public int getCapacity()
+    public int getConsumerCapacity()
     {
-        return _capacity;
+        return _consumerCapacity;
     }
 
-    public void setCapacity(int capacity)
+    public void setConsumerCapacity(int capacity)
     {
-        this._capacity = capacity;
+        _consumerCapacity = capacity;
+    }
+    
+    public int getProducerCapacity()
+    {
+        return _producerCapacity;
     }
 
+    public void setProducerCapacity(int capacity)
+    {
+        _producerCapacity = capacity;
+    }
+    
     public String getName()
     {
         return name;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org