You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2010/05/28 05:18:53 UTC
svn commit: r949083 - in
/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client:
AMQDestination.java AMQSession_0_10.java BasicMessageConsumer_0_10.java
messaging/address/AddressHelper.java messaging/address/Link.java
Author: rajith
Date: Fri May 28 03:18:52 2010
New Revision: 949083
URL: http://svn.apache.org/viewvc?rev=949083&view=rev
Log:
1. Capacity can now be specified as " capacity : {source: 5, target 10}" in addition to "capacity:10" where both source and target is set to 10.
2. If the exchange type if direct and no subject is set, then the routing_key is set to "" instead of throwing an exception.
3. Added a fix to infer the exchange type if specified in the x-declares section.
4. The link can now specify an optional 'name' parameter which will be used as the queue name if present.
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=949083&r1=949082&r2=949083&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java Fri May 28 03:18:52 2010
@@ -139,7 +139,7 @@ public abstract class AMQDestination imp
protected Node _targetNode;
protected Node _sourceNode;
protected Link _targetLink;
- protected Link _sourceLink;
+ protected Link _link;
// ----- / Fields required to support new address syntax -------
static
@@ -739,14 +739,14 @@ public abstract class AMQDestination imp
_sourceNode = node;
}
- public Link getSourceLink()
+ public Link getLink()
{
- return _sourceLink;
+ return _link;
}
- public void setSourceLink(Link link)
+ public void setLink(Link link)
{
- _sourceLink = link;
+ _link = link;
}
public void setExchangeName(AMQShortString name)
@@ -792,7 +792,7 @@ public abstract class AMQDestination imp
_addressType = _addrHelper.getTargetNodeType();
_targetNode = _addrHelper.getTargetNode(_addressType);
_sourceNode = _addrHelper.getSourceNode(_addressType);
- _sourceLink = _addrHelper.getLink();
+ _link = _addrHelper.getLink();
}
// This method is needed if we didn't know the node type at the beginning.
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=949083&r1=949082&r2=949083&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Fri May 28 03:18:52 2010
@@ -612,9 +612,9 @@ public class AMQSession_0_10 extends AMQ
{
long capacity = 0;
if (destination.getDestSyntax() == DestSyntax.ADDR &&
- destination.getSourceLink().getCapacity() > 0)
+ destination.getLink().getConsumerCapacity() > 0)
{
- capacity = destination.getSourceLink().getCapacity();
+ capacity = destination.getLink().getConsumerCapacity();
}
else if (prefetch())
{
@@ -1229,10 +1229,10 @@ public class AMQSession_0_10 extends AMQ
dest.setRoutingKey(ExchangeDefaults.WILDCARD_ANY);
dest.setSubject(ExchangeDefaults.WILDCARD_ANY.toString());
}
- else if (dest.getExchangeClass() == ExchangeDefaults.DIRECT_EXCHANGE_CLASS)
+ else
{
- throw new AMQException("If sending to an exchange of type direct," +
- " a valid subject must be specified");
+ dest.setRoutingKey(new AMQShortString(""));
+ dest.setSubject("");
}
}
}
@@ -1242,9 +1242,10 @@ public class AMQSession_0_10 extends AMQ
QueueNode node = (QueueNode)dest.getSourceNode(); // source node is never null
if (dest.getQueueName() == null || !isQueueExist(dest,node,true))
{
- // can name : my-queue be used in x-declare?
- // if so set it to dest queue name
- // if (dest.getQueueName() == null) { dest.setName(node.getName()) }
+ if (dest.getLink() != null && dest.getLink().getName() != null)
+ {
+ dest.setQueueName(new AMQShortString(dest.getLink().getName()));
+ }
send0_10QueueDeclare(dest,null,false,false);
}
node.addBinding(new Binding(dest.getAddressName(),
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=949083&r1=949082&r2=949083&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Fri May 28 03:18:52 2010
@@ -106,9 +106,9 @@ public class BasicMessageConsumer_0_10 e
// Destination setting overrides connection defaults
if (destination.getDestSyntax() == DestSyntax.ADDR &&
- destination.getSourceLink().getCapacity() > 0)
+ destination.getLink().getConsumerCapacity() > 0)
{
- capacity = destination.getSourceLink().getCapacity();
+ capacity = destination.getLink().getConsumerCapacity();
}
else if (getSession().prefetch())
{
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java?rev=949083&r1=949082&r2=949083&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java Fri May 28 03:18:52 2010
@@ -58,6 +58,8 @@ public class AddressHelper
public static final String BINDINGS = "bindings";
public static final String BROWSE_ONLY = "browse";
public static final String CAPACITY = "capacity";
+ public static final String CAPACITY_SOURCE = "source";
+ public static final String CAPACITY_TARGET = "target";
public static final String NAME = "name";
public static final String EXCHANGE = "exchange";
public static final String QUEUE = "queue";
@@ -220,11 +222,7 @@ public class AddressHelper
{
return AMQDestination.QUEUE_TYPE;
}
- else if ((nodeProps.getString(TYPE).equals("topic") ||
- nodeProps.getString(TYPE).equals("direct") ||
- nodeProps.getString(TYPE).equals("fanout") ||
- nodeProps.getString(TYPE).equals("match") ||
- nodeProps.getString(TYPE).equals("xml")) )
+ else if (nodeProps.getString(TYPE).equals("topic"))
{
return AMQDestination.TOPIC_TYPE;
}
@@ -258,7 +256,8 @@ public class AddressHelper
Map declareArgs = getDeclareArgs(parent);
MapAccessor argsMap = new MapAccessor(declareArgs);
ExchangeNode node = new ExchangeNode();
- node.setExchangeType(nodeProps.getString(TYPE));
+ node.setExchangeType(argsMap.getString(TYPE) == null?
+ "topic":argsMap.getString(TYPE));
node.setDeclareArgs(getQpidExchangeOptions(argsMap));
fillInCommonNodeArgs(node,parent,argsMap);
return node;
@@ -285,6 +284,11 @@ public class AddressHelper
node.setBindings(getBindings(parent));
}
+ /**
+ * if the type == queue x-declare args from the node props is used.
+ * if the type == exchange x-declare args from the link props is used
+ * else just create a default temp queue.
+ */
public Node getSourceNode(int addressType)
{
if (addressType == AMQDestination.QUEUE_TYPE && nodeProps != null)
@@ -309,7 +313,19 @@ public class AddressHelper
{
link.setDurable(linkProps.getBoolean(DURABLE)== null? false : linkProps.getBoolean(DURABLE));
link.setName(linkProps.getString(NAME));
- link.setCapacity(linkProps.getInt(CAPACITY));
+
+ if (((Map)address.getOptions().get(LINK)).get(CAPACITY) instanceof Map)
+ {
+ MapAccessor capacityProps = new MapAccessor(
+ (Map)((Map)address.getOptions().get(LINK)).get(CAPACITY));
+ link.setConsumerCapacity(capacityProps.getInt(CAPACITY_SOURCE));
+ link.setProducerCapacity(capacityProps.getInt(CAPACITY_TARGET));
+ }
+ else
+ {
+ link.setConsumerCapacity(linkProps.getInt(CAPACITY));
+ link.setProducerCapacity(linkProps.getInt(CAPACITY));
+ }
link.setFilter(linkProps.getString(FILTER));
// so far filter type not used
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java?rev=949083&r1=949082&r2=949083&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java Fri May 28 03:18:52 2010
@@ -31,7 +31,8 @@ public class Link
protected FilterType _filterType = FilterType.SUBJECT;
protected boolean _isNoLocal;
protected boolean _isDurable;
- protected int _capacity = 0;
+ protected int _consumerCapacity = 0;
+ protected int _producerCapacity = 0;
protected Node node;
public Node getNode()
@@ -84,16 +85,26 @@ public class Link
_isNoLocal = noLocal;
}
- public int getCapacity()
+ public int getConsumerCapacity()
{
- return _capacity;
+ return _consumerCapacity;
}
- public void setCapacity(int capacity)
+ public void setConsumerCapacity(int capacity)
{
- this._capacity = capacity;
+ _consumerCapacity = capacity;
+ }
+
+ public int getProducerCapacity()
+ {
+ return _producerCapacity;
}
+ public void setProducerCapacity(int capacity)
+ {
+ _producerCapacity = capacity;
+ }
+
public String getName()
{
return name;
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org