You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2010/11/04 23:49:53 UTC

svn commit: r1031321 - in /qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client: AMQSession_0_10.java BasicMessageConsumer.java BasicMessageConsumer_0_10.java messaging/address/AddressHelper.java messaging/address/Link.java

Author: rajith
Date: Thu Nov  4 22:49:52 2010
New Revision: 1031321

URL: http://svn.apache.org/viewvc?rev=1031321&view=rev
Log: (empty)

Modified:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1031321&r1=1031320&r2=1031321&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Thu Nov  4 22:49:52 2010
@@ -577,7 +577,8 @@ public class AMQSession_0_10 extends AMQ
         try
         {
             boolean isTopic;
-
+            Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments());
+            
             if (consumer.getDestination().getDestSyntax() == AMQDestination.DestSyntax.BURL)
             {
                 isTopic = consumer.getDestination() instanceof AMQTopic ||
@@ -593,9 +594,12 @@ public class AMQSession_0_10 extends AMQ
                 preAcquire = !consumer.isNoConsume() && 
                              (isTopic || consumer.getMessageSelector() == null || 
                               consumer.getMessageSelector().equals(""));
+                
+                arguments.putAll(
+                        (Map<? extends String, ? extends Object>) consumer.getDestination().getLink().getSubscription().getArgs());
             }
             
-            Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments());
+            
             getQpidSession().messageSubscribe
                 (queueName.toString(), String.valueOf(tag),
                  getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1031321&r1=1031320&r2=1031321&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Thu Nov  4 22:49:52 2010
@@ -107,7 +107,7 @@ public abstract class BasicMessageConsum
     /**
      * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover
      */
-    private final boolean _exclusive;
+    protected boolean _exclusive;
 
     /**
      * The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per
@@ -182,7 +182,7 @@ public abstract class BasicMessageConsum
         _prefetchHigh = prefetchHigh;
         _prefetchLow = prefetchLow;
         _exclusive = exclusive;
-
+        
         _synchronousQueue = new LinkedBlockingQueue();
         _autoClose = autoClose;
         _noConsume = noConsume;

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1031321&r1=1031320&r2=1031321&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Thu Nov  4 22:49:52 2010
@@ -489,4 +489,24 @@ public class BasicMessageConsumer_0_10 e
             clearReceiveQueue();
         }
     }
+    
+    public boolean isExclusive()
+    {
+        AMQDestination dest = this.getDestination();
+        if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
+        {
+            if (dest.getAddressType() == AMQDestination.TOPIC_TYPE)
+            {
+                return true;
+            }
+            else
+            {                
+                return dest.getLink().getSubscription().isExclusive();
+            }
+        }
+        else
+        {
+            return _exclusive;
+        }
+    }
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java?rev=1031321&r1=1031320&r2=1031321&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java Thu Nov  4 22:49:52 2010
@@ -27,6 +27,7 @@ import java.util.Map;
 
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQDestination.Binding;
+import org.apache.qpid.client.messaging.address.Link.Subscription;
 import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
 import org.apache.qpid.client.messaging.address.Node.QueueNode;
 import org.apache.qpid.client.messaging.address.Node.UnknownNodeType;
@@ -264,6 +265,7 @@ public class AddressHelper
     public Link getLink()
     {
         Link link = new Link();
+        link.setSubscription(new Subscription());
         if (linkProps != null)
         {
             link.setDurable(linkProps.getBoolean(DURABLE) == null ? false
@@ -283,7 +285,8 @@ public class AddressHelper
                         .setProducerCapacity(capacityProps
                                 .getInt(CAPACITY_TARGET) == null ? 0
                                 : capacityProps.getInt(CAPACITY_TARGET));
-            } else
+            } 
+            else
             {
                 int cap = linkProps.getInt(CAPACITY) == null ? 0 : linkProps
                         .getInt(CAPACITY);
@@ -292,6 +295,21 @@ public class AddressHelper
             }
             link.setFilter(linkProps.getString(FILTER));
             // so far filter type not used
+            
+            if (((Map) address.getOptions().get(LINK)).containsKey(X_SUBSCRIBE))
+            {   
+                Map x_subscribe = (Map)((Map) address.getOptions().get(LINK)).get(X_SUBSCRIBE);
+                
+                if (x_subscribe.containsKey(ARGUMENTS))
+                {
+                    link.getSubscription().setArgs((Map<String,Object>)x_subscribe.get(ARGUMENTS));
+                }
+                
+                boolean exclusive = x_subscribe.containsKey(EXCLUSIVE) ?
+                                    Boolean.parseBoolean((String)x_subscribe.get(EXCLUSIVE)): false;
+                
+                link.getSubscription().setExclusive(exclusive);
+            }
         }
 
         return link;

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java?rev=1031321&r1=1031320&r2=1031321&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java Thu Nov  4 22:49:52 2010
@@ -20,6 +20,9 @@
  */
 package org.apache.qpid.client.messaging.address;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.qpid.client.messaging.address.Node.QueueNode;
 
 public class Link
@@ -34,6 +37,7 @@ public class Link
     protected int _consumerCapacity = 0;
     protected int _producerCapacity = 0;
     protected Node node;
+    protected Subscription subscription;
     
     public Node getNode()
     {
@@ -114,4 +118,40 @@ public class Link
     {
         this.name = name;
     }
+    
+    public Subscription getSubscription()
+    {
+        return this.subscription;
+    }    
+ 
+    public void setSubscription(Subscription subscription)
+    {
+        this.subscription = subscription;
+    }   
+    
+    public static class Subscription
+    {
+        private Map<String,Object> args = new HashMap<String,Object>();        
+        private boolean exclusive = false;
+        
+        public Map<String, Object> getArgs()
+        {
+            return args;
+        }
+        
+        public void setArgs(Map<String, Object> args)
+        {
+            this.args = args;
+        }
+        
+        public boolean isExclusive()
+        {
+            return exclusive;
+        }
+        
+        public void setExclusive(boolean exclusive)
+        {
+            this.exclusive = exclusive;
+        }
+    }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org