You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2010/02/03 18:31:05 UTC

svn commit: r906142 - in /qpid/trunk/qpid/java/client/src: main/java/org/apache/qpid/client/ test/java/org/apache/qpid/test/unit/message/

Author: rajith
Date: Wed Feb  3 17:31:04 2010
New Revision: 906142

URL: http://svn.apache.org/viewvc?rev=906142&view=rev
Log:
This is related to QPID-1831
I added the patch attached to the above JIRA with modifications.
The modifications include integration with the address parser added by Rafi, and several refactoring and bug fixes to the original patch.

Modified:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
    qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java?rev=906142&r1=906141&r2=906142&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java Wed Feb  3 17:31:04 2010
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.client;
 
+import java.net.URISyntaxException;
+
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.url.BindingURL;
 
@@ -38,6 +40,11 @@
     {
         super(binding);
     }
+
+    public AMQAnyDestination(String str) throws URISyntaxException
+    {
+        super(str);
+    }
     
     public AMQAnyDestination(AMQShortString exchangeName,AMQShortString exchangeClass,
                              AMQShortString routingKey,boolean isExclusive, 

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=906142&r1=906141&r2=906142&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Wed Feb  3 17:31:04 2010
@@ -54,7 +54,6 @@
 import javax.naming.Referenceable;
 import javax.naming.StringRefAddr;
 
-import org.apache.configuration.ClientProperties;
 import org.apache.qpid.AMQConnectionFailureException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQProtocolException;
@@ -63,6 +62,7 @@
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.failover.FailoverProtectedOperation;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicQosBody;

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=906142&r1=906141&r2=906142&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Wed Feb  3 17:31:04 2010
@@ -29,10 +29,10 @@
 import javax.jms.JMSException;
 import javax.jms.XASession;
 
-import org.apache.configuration.ClientProperties;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.failover.FailoverProtectedOperation;
+import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.framing.ProtocolVersion;
 import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.jms.Session;

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=906142&r1=906141&r2=906142&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java Wed Feb  3 17:31:04 2010
@@ -21,6 +21,9 @@
 package org.apache.qpid.client;
 
 import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 import javax.jms.Destination;
 import javax.naming.NamingException;
@@ -28,26 +31,35 @@
 import javax.naming.Referenceable;
 import javax.naming.StringRefAddr;
 
+import org.apache.qpid.client.messaging.address.AddressHelper;
+import org.apache.qpid.client.messaging.address.QpidExchangeOptions;
+import org.apache.qpid.client.messaging.address.QpidQueueOptions;
+import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.messaging.Address;
 import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.url.BindingURL;
 import org.apache.qpid.url.URLHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public abstract class AMQDestination implements Destination, Referenceable
 {
-    protected final AMQShortString _exchangeName;
+    private static final Logger _logger = LoggerFactory.getLogger(AMQDestination.class);
+    
+    protected AMQShortString _exchangeName;
 
-    protected final AMQShortString _exchangeClass;
+    protected AMQShortString _exchangeClass;
 
-    protected final boolean _isDurable;
+    protected boolean _isDurable;
 
-    protected final boolean _isExclusive;
+    protected boolean _isExclusive;
 
-    protected final boolean _isAutoDelete;
+    protected boolean _isAutoDelete;
 
-    private final boolean _browseOnly;
+    private boolean _browseOnly;
 
     private AMQShortString _queueName;
 
@@ -70,13 +82,107 @@
     public static final int QUEUE_TYPE = 1;
     public static final int TOPIC_TYPE = 2;
     public static final int UNKNOWN_TYPE = 3;
-
-    protected AMQDestination(String url) throws URISyntaxException
+    
+    // ----- Fields required to support new address syntax -------
+    
+    public enum DestSyntax {        
+      BURL,ADDR;
+      
+      public static DestSyntax getSyntaxType(String s)
+      {
+          if (("BURL").equals(s))
+          {
+              return BURL;
+          }
+          else if (("ADDR").equals(s))
+          {
+              return ADDR;
+          }
+          else
+          {
+              throw new IllegalArgumentException("Invalid Destination Syntax Type" +
+                                                 " should be one of {BURL|ADDR}");
+          }
+      }
+    } 
+    
+    public enum AddressOption { 
+      ALWAYS, NEVER, SENDER, RECEIVER; 
+        
+      public static AddressOption getOption(String str)
+      {
+          if ("always".equals(str)) return ALWAYS;
+          else if ("never".equals(str)) return NEVER;
+          else if ("sender".equals(str)) return SENDER;
+          else if ("receiver".equals(str)) return RECEIVER;
+          else throw new IllegalArgumentException(str + " is not an allowed value");
+      }
+    }
+    
+    public enum FilterType { SQL92, XQUERY, SUBJECT }
+    
+    protected static DestSyntax defaultDestSyntax;
+    
+    protected DestSyntax _destSyntax;
+
+    protected Address _address;
+    protected String _name;
+    protected String _subject;
+    protected AddressOption _create = AddressOption.NEVER;
+    protected AddressOption _assert = AddressOption.ALWAYS;
+    protected AddressOption _delete = AddressOption.NEVER;
+    
+    protected String _filter;
+    protected FilterType _filterType = FilterType.SUBJECT;
+    protected boolean _isNoLocal;
+    protected int _nodeType = QUEUE_TYPE;
+    protected String _alternateExchange;
+    protected QpidQueueOptions _queueOptions;
+    protected QpidExchangeOptions _exchangeOptions;
+    protected List<Binding> _bindings = new ArrayList<Binding>();
+    // ----- / Fields required to support new address syntax -------
+    
+    static
+    {
+        defaultDestSyntax = DestSyntax.getSyntaxType(
+                     System.getProperty(ClientProperties.DEST_SYNTAX,
+                                        DestSyntax.BURL.toString()));
+    }
+    
+    protected AMQDestination(Address address)
+    {
+        this._address = address;
+        getInfoFromAddress();
+        _destSyntax = DestSyntax.ADDR;
+        _logger.info("Based on " + address + " the selected destination syntax is " + _destSyntax);
+    }
+    
+    protected AMQDestination(String str) throws URISyntaxException
+    {
+        if (str.startsWith("BURL:") || 
+           (!str.startsWith("ADDR:") && defaultDestSyntax == DestSyntax.BURL))
+        {            
+            _destSyntax = DestSyntax.BURL;
+            getInfoFromBindingURL(new AMQBindingURL(str));            
+        }
+        else
+        {
+            _destSyntax = DestSyntax.ADDR;
+            this._address = createAddressFromString(str);
+            getInfoFromAddress();
+        }
+        _logger.info("Based on " + str + " the selected destination syntax is " + _destSyntax);
+    }
+    
+    //retained for legacy support
+    protected AMQDestination(BindingURL binding)
     {
-        this(new AMQBindingURL(url));
+        getInfoFromBindingURL(binding);
+        _destSyntax = DestSyntax.BURL;
+        _logger.info("Based on " + binding + " the selected destination syntax is " + _destSyntax);
     }
 
-    protected AMQDestination(BindingURL binding)
+    protected void getInfoFromBindingURL(BindingURL binding)
     {
         _exchangeName = binding.getExchangeName();
         _exchangeClass = binding.getExchangeClass();
@@ -153,7 +259,9 @@
         _queueName = queueName;
         _isDurable = isDurable;
         _bindingKeys = bindingKeys == null || bindingKeys.length == 0 ? new AMQShortString[0] : bindingKeys;
+        _destSyntax = DestSyntax.BURL;
         _browseOnly = browseOnly;
+        _logger.info("Based on " + toString() + " the selected destination syntax is " + _destSyntax);
     }
 
     public AMQShortString getEncodedName()
@@ -243,7 +351,14 @@
 
     public String toString()
     {
-        return toURL();
+        if (_destSyntax == DestSyntax.BURL)
+        {
+            return toURL();
+        }
+        else
+        {
+            return _address.toString();
+        }
 
     }
 
@@ -424,8 +539,8 @@
     public int hashCode()
     {
         int result;
-        result = _exchangeName.hashCode();
-        result = 29 * result + _exchangeClass.hashCode();
+        result = _exchangeName == null ? "".hashCode() : _exchangeName.hashCode();
+        result = 29 * result + (_exchangeClass == null ? "".hashCode() :_exchangeClass.hashCode());
         //result = 29 * result + _destinationName.hashCode();
         if (_queueName != null)
         {
@@ -513,6 +628,163 @@
         }
     }
 
+    // ----- new address syntax -----------
+    public static class Binding
+    {
+        String exchange;
+        String bindingKey;
+        Map<String,Object> args;
+        
+        public Binding(String exchange,String bindingKey,Map<String,Object> args)
+        {
+            this.exchange = exchange;
+            this.bindingKey = bindingKey;
+            this.args = args;
+        }
+        
+        public String getExchange() 
+        {
+            return exchange;
+        }
+
+        public String getBindingKey() 
+        {
+            return bindingKey;
+        }
+        
+        public Map<String, Object> getArgs() 
+        {
+            return args;
+        }
+    }
+    
+    public Address getAddress() {
+        return _address;
+    }
+    
+    public String getName() {
+        return _name;
+    }
+
+    public String getSubject() {
+        return _subject;
+    }
+
+    public AddressOption getCreate() {
+        return _create;
+    }
+
+    public AddressOption getAssert() {
+        return _assert;
+    }
+
+    public AddressOption getDelete() {
+        return _delete;
+    }
+
+    public String getFilter() {
+        return _filter;
+    }
+
+    public FilterType getFilterType() {
+        return _filterType;
+    }
+
+    public boolean isNoLocal() {
+        return _isNoLocal;
+    }
+
+    public int getNodeType() {
+        return _nodeType;
+    }
+
+    public QpidQueueOptions getQueueOptions() {
+        return _queueOptions;
+    }
+
+    public List<Binding> getBindings() {
+        return _bindings;
+    }
+
+    public void addBinding(Binding binding) {
+        this._bindings.add(binding);
+    }
+    
+    public DestSyntax getDestSyntax() {
+        return _destSyntax;
+    }
+    
+    public QpidExchangeOptions getExchangeOptions() {
+        return _exchangeOptions;
+    }
+
+    public String getAlternateExchange() {
+        return _alternateExchange;
+    }
+
+    public void setAlternateExchange(String alternateExchange) {
+        this._alternateExchange = alternateExchange;
+    }
+    
+    public void setExchangeName(AMQShortString name)
+    {
+        this._exchangeName = name;
+    }
+    
+    public void setExchangeClass(AMQShortString type)
+    {
+        this._exchangeClass = type;
+    }
+    
+    public void setRoutingKey(AMQShortString rk)
+    {
+        this._routingKey = rk;
+    }
+    
+    private Address createAddressFromString(String str)
+    {
+        if (str.startsWith("ADDR:"))
+        {
+            str = str.substring(str.indexOf(':')+1,str.length());
+        }
+       return Address.parse(str);
+    }
+    
+    private void getInfoFromAddress()
+    {
+        _name = _address.getName();
+        _subject = _address.getSubject();
+        
+        AddressHelper addrHelper = new AddressHelper(_address);
+        
+        _create = addrHelper.getCreate() != null ?
+                 AddressOption.getOption(addrHelper.getCreate()):AddressOption.NEVER;
+                
+        _assert = addrHelper.getAssert() != null ?
+                 AddressOption.getOption(addrHelper.getAssert()):AddressOption.ALWAYS;
+
+        _delete = addrHelper.getDelete() != null ?
+                 AddressOption.getOption(addrHelper.getDelete()):AddressOption.NEVER;
+                        
+        _filter = addrHelper.getFilter(); 
+        _isNoLocal = addrHelper.isNoLocal();
+        _isDurable = addrHelper.isDurable();
+        _isAutoDelete = addrHelper.isAutoDelete();
+        _isExclusive = addrHelper.isExclusive();
+        _browseOnly = addrHelper.isBrowseOnly();
+        
+        _nodeType = addrHelper.getNodeType() == null || addrHelper.getNodeType().equals("queue")?
+                   QUEUE_TYPE : TOPIC_TYPE;
+        
+        _alternateExchange = addrHelper.getAltExchange();
+        
+        _queueOptions = addrHelper.getQpidQueueOptions();
+        _exchangeOptions = addrHelper.getQpidExchangeOptions();
+        _bindings = addrHelper.getBindings();
+    }
+    
+    // ----- / new address syntax -----------    
+
     public boolean isBrowseOnly()
     {
         return _browseOnly;

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=906142&r1=906141&r2=906142&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Wed Feb  3 17:31:04 2010
@@ -28,6 +28,8 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInvalidArgumentException;
 import org.apache.qpid.AMQInvalidRoutingKeyException;
+import org.apache.qpid.client.AMQDestination.AddressOption;
+import org.apache.qpid.client.AMQDestination.DestSyntax;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.failover.FailoverNoopSupport;
 import org.apache.qpid.client.failover.FailoverProtectedOperation;
@@ -203,7 +205,7 @@
 
     protected final boolean DECLARE_EXCHANGES =
         Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true"));
-
+    
     protected final boolean USE_AMQP_ENCODED_MAP_MESSAGE;
 
     /** System property to enable strict AMQP compliance. */
@@ -368,7 +370,7 @@
     private boolean _dirty;
     /** Has failover occured on this session with outstanding actions to commit? */
     private boolean _failedOverDirty;
-
+    
     private static final class FlowControlIndicator
     {
         private volatile boolean _flowControl = true;
@@ -2095,7 +2097,7 @@
             if (tempDest.getSession() != this)
             {
                 _logger.debug("destination is on different session");
-                throw new JMSException("Cannot consume from a temporary destination created onanother session");
+                throw new JMSException("Cannot consume from a temporary destination created on another session");
             }
 
             if (tempDest.isDeleted())
@@ -2301,7 +2303,7 @@
                         checkNotClosed();
                         long producerId = getNextProducerId();
                         P producer = createMessageProducer(destination, mandatory,
-                                                                              immediate, waitUntilSent, producerId);
+                                                           immediate, waitUntilSent, producerId);
                         registerProducer(producerId, producer);
 
                         return producer;
@@ -2535,15 +2537,23 @@
 
         AMQProtocolHandler protocolHandler = getProtocolHandler();
 
-        if (DECLARE_EXCHANGES)
+        if (amqd.getDestSyntax() == DestSyntax.ADDR)
         {
-            declareExchange(amqd, protocolHandler, nowait);
+            handleAddressBasedDestination(amqd,true,nowait);            
         }
-
-        if (DECLARE_QUEUES || amqd.isNameRequired())
+        else
         {
-            declareQueue(amqd, protocolHandler, consumer.isNoLocal(), nowait);
+            if (DECLARE_EXCHANGES)
+            {
+                declareExchange(amqd, protocolHandler, nowait);
+            }
+    
+            if (DECLARE_QUEUES || amqd.isNameRequired())
+            {
+                declareQueue(amqd, protocolHandler, consumer.isNoLocal(), nowait);
+            }
         }
+        
         AMQShortString queueName = amqd.getAMQQueueName();
 
         // store the consumer queue name
@@ -2589,6 +2599,10 @@
         }
     }
 
+    public abstract void handleAddressBasedDestination(AMQDestination dest, 
+                                                       boolean isConsumer,
+                                                       boolean noWait) throws AMQException;
+    
     private void registerProducer(long producerId, MessageProducer producer)
     {
         _producers.put(new Long(producerId), producer);

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=906142&r1=906141&r2=906142&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Wed Feb  3 17:31:04 2010
@@ -17,33 +17,60 @@
  */
 package org.apache.qpid.client;
 
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
+import static org.apache.qpid.transport.Option.BATCH;
+import static org.apache.qpid.transport.Option.NONE;
+import static org.apache.qpid.transport.Option.SYNC;
+import static org.apache.qpid.transport.Option.UNRELIABLE;
+
+import java.lang.ref.WeakReference;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.UUID;
+
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
 import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.client.AMQDestination.AddressOption;
+import org.apache.qpid.client.AMQDestination.Binding;
+import org.apache.qpid.client.AMQDestination.DestSyntax;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.failover.FailoverNoopSupport;
 import org.apache.qpid.client.failover.FailoverProtectedOperation;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
-import org.apache.qpid.client.message.FiledTableSupport;
 import org.apache.qpid.client.message.AMQMessageDelegateFactory;
+import org.apache.qpid.client.message.AMQPEncodedMapMessage;
+import org.apache.qpid.client.message.FiledTableSupport;
+import org.apache.qpid.client.message.JMSMapMessage;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.client.message.UnprocessedMessage_0_10;
-import org.apache.qpid.util.Serial;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.transport.ExchangeBoundResult;
+import org.apache.qpid.transport.ExchangeQueryResult;
 import org.apache.qpid.transport.ExecutionException;
 import org.apache.qpid.transport.MessageAcceptMode;
 import org.apache.qpid.transport.MessageAcquireMode;
 import org.apache.qpid.transport.MessageCreditUnit;
 import org.apache.qpid.transport.MessageFlowMode;
 import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.transport.RangeSet;
 import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.ExchangeBoundResult;
-import org.apache.qpid.transport.Future;
+import org.apache.qpid.transport.QueueQueryResult;
 import org.apache.qpid.transport.Range;
+import org.apache.qpid.transport.RangeSet;
 import org.apache.qpid.transport.Session;
 import org.apache.qpid.transport.SessionException;
 import org.apache.qpid.transport.SessionListener;
+import org.apache.qpid.util.Serial;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -123,7 +150,7 @@
     private long maxAckDelay = Long.getLong("qpid.session.max_ack_delay", 1000);
     private TimerTask flushTask = null;
     private RangeSet unacked = new RangeSet();
-    private int unackedCount = 0;
+    private int unackedCount = 0;    
 
     /**
      * USed to store the range of in tx messages
@@ -305,18 +332,41 @@
                               final AMQDestination destination, final boolean nowait)
             throws AMQException, FailoverException
     {
-        Map args = FiledTableSupport.convertToMap(arguments);
-        // this is there only becasue the broker may expect a value for x-match
-        if( ! args.containsKey("x-match") )
+        if (destination.getDestSyntax() == DestSyntax.BURL)
         {
-            args.put("x-match", "any");
+            Map args = FiledTableSupport.convertToMap(arguments);
+            // this is there only becasue the broker may expect a value for x-match
+            if( ! args.containsKey("x-match") )
+            {
+                args.put("x-match", "any");
+            }
+    
+            for (AMQShortString rk: destination.getBindingKeys())
+            {
+                _logger.debug("Binding queue : " + queueName.toString() + 
+                              " exchange: " + exchangeName.toString() + 
+                              " using binding key " + rk.asString());
+                getQpidSession().exchangeBind(queueName.toString(), 
+                                              exchangeName.toString(), 
+                                              rk.toString(), 
+                                              args);
+            }
         }
-
-        for (AMQShortString rk: destination.getBindingKeys())
+        else
         {
-            _logger.debug("Binding queue : " + queueName.toString() + " exchange: " + exchangeName.toString() + " using binding key " + rk.asString());
-            getQpidSession().exchangeBind(queueName.toString(), exchangeName.toString(), rk.toString(), args);
+            for (Binding binding: destination.getBindings())
+            {
+                _logger.debug("Binding queue : " + queueName.toString() + 
+                              " exchange: " + binding.getExchange() + 
+                              " using binding key " + binding.getBindingKey() + 
+                              " with args " + printMap(binding.getArgs()));
+                getQpidSession().exchangeBind(queueName.toString(), 
+                                              binding.getExchange(),
+                                              binding.getBindingKey(),
+                                              binding.getArgs()); 
+            }
         }
+        
         if (!nowait)
         {
             // We need to sync so that we get notify of an error.
@@ -470,8 +520,7 @@
     public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey,AMQShortString[] bindingKeys)
     throws JMSException
     {
-        String rk = null;
-        boolean res;
+        String rk = null;        
         if (bindingKeys != null && bindingKeys.length>0)
         {
             rk = bindingKeys[0].toString();
@@ -480,18 +529,32 @@
         {
             rk = routingKey.toString();
         }
-
+        return isQueueBound(exchangeName.toString(),queueName.toString(),rk,null);
+    }
+    
+    public boolean isQueueBound(final String exchangeName, final String queueName, final String bindingKey,Map<String,Object> args)
+    throws JMSException
+    {
+        boolean res;
         ExchangeBoundResult bindingQueryResult =
-            getQpidSession().exchangeBound(exchangeName.toString(),queueName.toString(), rk, null).get();
+            getQpidSession().exchangeBound(exchangeName,queueName, bindingKey, args).get();
 
-        if (rk == null)
+        if (bindingKey == null)
         {
             res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getQueueNotFound());
         }
         else
-        {
-            res = !(bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult
-                    .getQueueNotMatched());
+        {   
+            if (args == null)
+            {
+                res = !(bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult
+                        .getQueueNotMatched());
+            }
+            else
+            {
+                res = !(bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult
+                        .getQueueNotMatched() || bindingQueryResult.getArgsNotMatched());
+            }
         }
         return res;
     }
@@ -566,15 +629,26 @@
     /**
      * creates an exchange if it does not already exist
      */
-    public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type,
-                                    final AMQProtocolHandler protocolHandler, final boolean nowait)
+    public void sendExchangeDeclare(final AMQShortString name,
+            final AMQShortString type,
+            final AMQProtocolHandler protocolHandler, final boolean nowait)
             throws AMQException, FailoverException
     {
-        getQpidSession().exchangeDeclare(name.toString(),
-                                        type.toString(),
-                                        null,
-                                        null,
-                                        name.toString().startsWith("amq.")? Option.PASSIVE:Option.NONE);
+        sendExchangeDeclare(name.asString(), type.asString(), null, null,
+                nowait);
+    }
+
+    public void sendExchangeDeclare(final String name, final String type,
+            final String alternateExchange, final Map<String, Object> args,
+            final boolean nowait) throws AMQException
+    {
+        getQpidSession().exchangeDeclare(
+                name,
+                type,
+                alternateExchange,
+                args,
+                name.toString().startsWith("amq.") ? Option.PASSIVE
+                        : Option.NONE);
         // We need to sync so that we get notify of an error.
         if (!nowait)
         {
@@ -598,28 +672,35 @@
      */
     public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
                                                final boolean noLocal, final boolean nowait)
-            throws AMQException, FailoverException
+            throws AMQException
     {
-        AMQShortString res;
+        AMQShortString queueName;
         if (amqd.getAMQQueueName() == null)
         {
             // generate a name for this queue
-            res = new AMQShortString("TempQueue" + UUID.randomUUID());
+            queueName = new AMQShortString("TempQueue" + UUID.randomUUID());
+            amqd.setQueueName(queueName);
         }
         else
         {
-            res = amqd.getAMQQueueName();
+            queueName = amqd.getAMQQueueName();
         }
-        Map<String,Object> arguments = null;
-        if (noLocal)
-        {
-            arguments = new HashMap<String,Object>();
+        
+        Map<String,Object> arguments = new HashMap<String,Object>();
+        if (noLocal || amqd.isNoLocal())
+        {            
             arguments.put("no-local", true);
         }
-        getQpidSession().queueDeclare(res.toString(), null, arguments,
+        
+        if (amqd.getDestSyntax() == DestSyntax.ADDR && amqd.getQueueOptions() != null)
+        {
+            arguments.putAll(amqd.getQueueOptions());
+        }
+        
+        getQpidSession().queueDeclare(queueName.toString(), amqd.getAlternateExchange() , arguments,
                                       amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
                                       amqd.isDurable() ? Option.DURABLE : Option.NONE,
-                                      !amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
+                                      amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE);                
         // passive --> false
         if (!nowait)
         {
@@ -627,7 +708,7 @@
             getQpidSession().sync();
             getCurrentException();
         }
-        return res;
+        return queueName;
     }
 
     /**
@@ -934,5 +1015,136 @@
     {
         return AMQMessageDelegateFactory.FACTORY_0_10;
     }
-
+    
+    public boolean isExchangeExist(AMQDestination dest,boolean assertNode)
+    {
+        boolean match = true;
+        ExchangeQueryResult result = getQpidSession().exchangeQuery(dest.getName(), Option.NONE).get();
+        match = !result.getNotFound();
+        
+        if (match && assertNode)
+        {
+            match =  (result.getDurable() == dest.isDurable()) &&
+                     (dest.getExchangeClass().asString().equals(result.getType())) &&
+                     (matchProps(result.getArguments(),dest.getQueueOptions()));
+        }
+        if (match)
+        {
+            dest.setExchangeClass(new AMQShortString(result.getType()));
+        }
+        
+        return match;
+    }
+    
+    public boolean isQueueExist(AMQDestination dest,boolean assertNode)
+    {
+        boolean match = true;
+        QueueQueryResult result = getQpidSession().queueQuery(dest.getName(), Option.NONE).get();
+        match = dest.getName().equals(result.getQueue());
+        
+        if (match && assertNode)
+        {
+            match = (result.getDurable() == dest.isDurable()) && 
+                     (result.getAutoDelete() == dest.isAutoDelete()) &&
+                     (result.getExclusive() == dest.isExclusive()) &&
+                     (matchProps(result.getArguments(),dest.getQueueOptions()));
+        }
+        
+        return match;
+    }
+    
+    private boolean matchProps(Map<String,Object> target,Map<String,Object> source)
+    {
+        boolean match = true;
+        for (String key: source.keySet())
+        {
+            match = target.containsKey(key) && 
+                    target.get(key).equals(source.get(key));
+            
+            if (!match) return match;
+        }
+        
+        return match;
+    }
+
+    public void handleAddressBasedDestination(AMQDestination dest, 
+                                              boolean isConsumer,
+                                              boolean noWait) throws AMQException
+    {
+        boolean noLocal = dest.isNoLocal();
+        boolean assertNode = (dest.getAssert() == AddressOption.ALWAYS) || 
+                             (isConsumer && dest.getAssert() == AddressOption.RECEIVER) ||
+                             (!isConsumer && dest.getAssert() == AddressOption.SENDER);
+                    
+        
+        if (isExchangeExist(dest,assertNode))
+        {
+            dest.setExchangeName(new AMQShortString(dest.getName()));
+            dest.setRoutingKey(new AMQShortString(dest.getSubject()));
+            if (isConsumer)
+            {
+                dest.setQueueName(null);
+                dest.addBinding(new Binding(dest.getName(),
+                                                 dest.getSubject(),
+                                                 null));
+            }
+        }
+        else if (isQueueExist(dest,assertNode))
+        {
+            dest.setQueueName(new AMQShortString(dest.getName()));
+            dest.setExchangeName(new AMQShortString(""));
+            dest.setExchangeClass(new AMQShortString(""));
+            dest.setRoutingKey(dest.getAMQQueueName());
+        }
+        else if (dest.getCreate() == AddressOption.ALWAYS ||
+                 dest.getCreate() == AddressOption.RECEIVER && isConsumer ||
+                 dest.getCreate() == AddressOption.SENDER && !isConsumer)
+        {
+            if (dest.getNodeType() == AMQDestination.QUEUE_TYPE)
+            {
+                dest.setQueueName(new AMQShortString(dest.getName()));
+                dest.setExchangeName(new AMQShortString(""));
+                dest.setExchangeClass(new AMQShortString(""));
+                dest.setRoutingKey(dest.getAMQQueueName());
+            }
+            else
+            {
+                dest.setQueueName(null);
+                dest.setExchangeName(new AMQShortString(dest.getName()));
+                dest.setExchangeClass(dest.getExchangeClass() == null? 
+                                      ExchangeDefaults.TOPIC_EXCHANGE_CLASS:dest.getExchangeClass());  
+                dest.setRoutingKey(new AMQShortString(dest.getSubject()));
+                dest.addBinding(new Binding(dest.getName(),
+                                                 dest.getSubject(),
+                                                 null));
+                
+                sendExchangeDeclare(dest.getName(), dest.getExchangeClass().asString(),
+                                    dest.getAlternateExchange(), dest.getExchangeOptions(),false);
+                
+            }
+            
+            send0_10QueueDeclare(dest,null,noLocal,noWait);
+        }
+        else
+        {
+            throw new AMQException("The name supplied in the address doesn't resolve to an exchange or a queue");
+        }
+    }
+    
+    /** This should be moved to a suitable utility class */
+    private String printMap(Map<String,Object> map)
+    {
+        StringBuilder sb = new StringBuilder();
+        sb.append("<");
+        if (map != null)
+        {
+            for(String key : map.keySet())
+            {
+                sb.append(key).append(" = ").append(map.get(key)).append(" ");
+            }
+        }
+        sb.append(">");
+        return sb.toString();
+    }
+    
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=906142&r1=906141&r2=906142&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Wed Feb  3 17:31:04 2010
@@ -590,4 +590,11 @@
         declareExchange(new AMQShortString("amq.direct"), new AMQShortString("direct"), false);
     }
 
+    public void handleAddressBasedDestination(AMQDestination dest, 
+                                              boolean isConsumer,
+                                              boolean noWait) throws AMQException
+    {
+        throw new UnsupportedOperationException("The new addressing based sytanx is "
+                + "not supported for AMQP 0-8/0-9 versions");
+    }
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=906142&r1=906141&r2=906142&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Wed Feb  3 17:31:04 2010
@@ -17,34 +17,39 @@
  */
 package org.apache.qpid.client;
 
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
+import static org.apache.qpid.transport.Option.NONE;
+import static org.apache.qpid.transport.Option.SYNC;
+
 import java.nio.ByteBuffer;
+import java.util.UUID;
 
+import javax.jms.DeliveryMode;
 import javax.jms.JMSException;
 import javax.jms.Message;
-import javax.jms.DeliveryMode;
 
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.FiledTableSupport;
+import org.apache.qpid.client.AMQDestination.AddressOption;
+import org.apache.qpid.client.AMQDestination.DestSyntax;
 import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
+import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageDeliveryMode;
+import org.apache.qpid.transport.MessageDeliveryPriority;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.Option;
 import org.apache.qpid.util.Strings;
-import org.apache.qpid.njms.ExceptionHelper;
-import org.apache.qpid.transport.*;
-import static org.apache.qpid.transport.Option.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This is a 0_10 message producer.
  */
 public class BasicMessageProducer_0_10 extends BasicMessageProducer
 {
+    private static final Logger _logger = LoggerFactory.getLogger(BasicMessageProducer_0_10.class);
     private byte[] userIDBytes;
 
     BasicMessageProducer_0_10(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
@@ -59,12 +64,27 @@
 
     void declareDestination(AMQDestination destination)
     {
-        String name = destination.getExchangeName().toString();
-        ((AMQSession_0_10) getSession()).getQpidSession().exchangeDeclare
-            (name,
-             destination.getExchangeClass().toString(),
-             null, null,
-             name.startsWith("amq.") ? Option.PASSIVE : Option.NONE);
+        if (destination.getDestSyntax() == DestSyntax.BURL)
+        {
+            String name = destination.getExchangeName().toString();
+            ((AMQSession_0_10) getSession()).getQpidSession().exchangeDeclare
+                (name,
+                 destination.getExchangeClass().toString(),
+                 null, null,
+                 name.startsWith("amq.") ? Option.PASSIVE : Option.NONE);
+        }
+        else
+        {       
+            try
+            {
+                getSession().handleAddressBasedDestination(destination,false,false);
+            }
+            catch(Exception e)
+            {
+                // Idealy this should be thrown to the JMS layer.
+                _logger.warn("Exception occured while verifying destination",e);
+            }
+        }
     }
 
     //--- Overwritten methods
@@ -136,7 +156,7 @@
             deliveryProp.setPriority(MessageDeliveryPriority.get((short) priority));
             message.setJMSPriority(priority);
         }
-        String exchangeName = destination.getExchangeName().toString();
+        String exchangeName = destination.getExchangeName() == null ? "" : destination.getExchangeName().toString();
         if ( deliveryProp.getExchange() == null || ! deliveryProp.getExchange().equals(exchangeName))
         {
             deliveryProp.setExchange(exchangeName);
@@ -166,7 +186,8 @@
             org.apache.mina.common.ByteBuffer data = message.getData();
             ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.buf().slice();
             
-            ssn.messageTransfer(destination.getExchangeName().toString(), MessageAcceptMode.NONE,
+            ssn.messageTransfer(destination.getExchangeName() == null ? "" : destination.getExchangeName().toString(), 
+                                MessageAcceptMode.NONE,
                                 MessageAcquireMode.PRE_ACQUIRED,
                                 new Header(deliveryProp, messageProps),
                     buffer, sync ? SYNC : NONE);

Modified: qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java?rev=906142&r1=906141&r2=906142&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java Wed Feb  3 17:31:04 2010
@@ -175,4 +175,12 @@
     public void sync()
     {
     }
+
+    public void handleAddressBasedDestination(AMQDestination dest, 
+                                              boolean isConsumer, 
+                                              boolean noWait) throws AMQException
+    {
+        throw new UnsupportedOperationException("The new addressing based sytanx is "
+                + "not supported for AMQP 0-8/0-9 versions");
+    }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org