You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2010/11/04 23:49:53 UTC
svn commit: r1031321 - in
/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client:
AMQSession_0_10.java BasicMessageConsumer.java
BasicMessageConsumer_0_10.java messaging/address/AddressHelper.java
messaging/address/Link.java
Author: rajith
Date: Thu Nov 4 22:49:52 2010
New Revision: 1031321
URL: http://svn.apache.org/viewvc?rev=1031321&view=rev
Log: (empty)
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1031321&r1=1031320&r2=1031321&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Thu Nov 4 22:49:52 2010
@@ -577,7 +577,8 @@ public class AMQSession_0_10 extends AMQ
try
{
boolean isTopic;
-
+ Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments());
+
if (consumer.getDestination().getDestSyntax() == AMQDestination.DestSyntax.BURL)
{
isTopic = consumer.getDestination() instanceof AMQTopic ||
@@ -593,9 +594,12 @@ public class AMQSession_0_10 extends AMQ
preAcquire = !consumer.isNoConsume() &&
(isTopic || consumer.getMessageSelector() == null ||
consumer.getMessageSelector().equals(""));
+
+ arguments.putAll(
+ (Map<? extends String, ? extends Object>) consumer.getDestination().getLink().getSubscription().getArgs());
}
- Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments());
+
getQpidSession().messageSubscribe
(queueName.toString(), String.valueOf(tag),
getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1031321&r1=1031320&r2=1031321&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Thu Nov 4 22:49:52 2010
@@ -107,7 +107,7 @@ public abstract class BasicMessageConsum
/**
* We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover
*/
- private final boolean _exclusive;
+ protected boolean _exclusive;
/**
* The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per
@@ -182,7 +182,7 @@ public abstract class BasicMessageConsum
_prefetchHigh = prefetchHigh;
_prefetchLow = prefetchLow;
_exclusive = exclusive;
-
+
_synchronousQueue = new LinkedBlockingQueue();
_autoClose = autoClose;
_noConsume = noConsume;
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1031321&r1=1031320&r2=1031321&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Thu Nov 4 22:49:52 2010
@@ -489,4 +489,24 @@ public class BasicMessageConsumer_0_10 e
clearReceiveQueue();
}
}
+
+ public boolean isExclusive()
+ {
+ AMQDestination dest = this.getDestination();
+ if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
+ {
+ if (dest.getAddressType() == AMQDestination.TOPIC_TYPE)
+ {
+ return true;
+ }
+ else
+ {
+ return dest.getLink().getSubscription().isExclusive();
+ }
+ }
+ else
+ {
+ return _exclusive;
+ }
+ }
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java?rev=1031321&r1=1031320&r2=1031321&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java Thu Nov 4 22:49:52 2010
@@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQDestination.Binding;
+import org.apache.qpid.client.messaging.address.Link.Subscription;
import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
import org.apache.qpid.client.messaging.address.Node.QueueNode;
import org.apache.qpid.client.messaging.address.Node.UnknownNodeType;
@@ -264,6 +265,7 @@ public class AddressHelper
public Link getLink()
{
Link link = new Link();
+ link.setSubscription(new Subscription());
if (linkProps != null)
{
link.setDurable(linkProps.getBoolean(DURABLE) == null ? false
@@ -283,7 +285,8 @@ public class AddressHelper
.setProducerCapacity(capacityProps
.getInt(CAPACITY_TARGET) == null ? 0
: capacityProps.getInt(CAPACITY_TARGET));
- } else
+ }
+ else
{
int cap = linkProps.getInt(CAPACITY) == null ? 0 : linkProps
.getInt(CAPACITY);
@@ -292,6 +295,21 @@ public class AddressHelper
}
link.setFilter(linkProps.getString(FILTER));
// so far filter type not used
+
+ if (((Map) address.getOptions().get(LINK)).containsKey(X_SUBSCRIBE))
+ {
+ Map x_subscribe = (Map)((Map) address.getOptions().get(LINK)).get(X_SUBSCRIBE);
+
+ if (x_subscribe.containsKey(ARGUMENTS))
+ {
+ link.getSubscription().setArgs((Map<String,Object>)x_subscribe.get(ARGUMENTS));
+ }
+
+ boolean exclusive = x_subscribe.containsKey(EXCLUSIVE) ?
+ Boolean.parseBoolean((String)x_subscribe.get(EXCLUSIVE)): false;
+
+ link.getSubscription().setExclusive(exclusive);
+ }
}
return link;
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java?rev=1031321&r1=1031320&r2=1031321&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java Thu Nov 4 22:49:52 2010
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.client.messaging.address;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.qpid.client.messaging.address.Node.QueueNode;
public class Link
@@ -34,6 +37,7 @@ public class Link
protected int _consumerCapacity = 0;
protected int _producerCapacity = 0;
protected Node node;
+ protected Subscription subscription;
public Node getNode()
{
@@ -114,4 +118,40 @@ public class Link
{
this.name = name;
}
+
+ public Subscription getSubscription()
+ {
+ return this.subscription;
+ }
+
+ public void setSubscription(Subscription subscription)
+ {
+ this.subscription = subscription;
+ }
+
+ public static class Subscription
+ {
+ private Map<String,Object> args = new HashMap<String,Object>();
+ private boolean exclusive = false;
+
+ public Map<String, Object> getArgs()
+ {
+ return args;
+ }
+
+ public void setArgs(Map<String, Object> args)
+ {
+ this.args = args;
+ }
+
+ public boolean isExclusive()
+ {
+ return exclusive;
+ }
+
+ public void setExclusive(boolean exclusive)
+ {
+ this.exclusive = exclusive;
+ }
+ }
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org