You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/02/28 17:14:57 UTC

svn commit: r1451244 [36/45] - in /qpid/branches/asyncstore: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf2/rub...

Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java Thu Feb 28 16:14:30 2013
@@ -50,29 +50,28 @@ public class BasicMessageProducer_0_8 ex
     BasicMessageProducer_0_8(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
             AMQSession session, AMQProtocolHandler protocolHandler, long producerId, Boolean immediate, Boolean mandatory) throws AMQException
     {
-        super(_logger,connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory);
+        super(_logger,connection, destination,transacted,channelId,session, producerId, immediate, mandatory);
     }
 
     void declareDestination(AMQDestination destination)
     {
-
-        final MethodRegistry methodRegistry = getSession().getMethodRegistry();
-        ExchangeDeclareBody body =
+        if(getSession().isDeclareExchanges())
+        {
+            final MethodRegistry methodRegistry = getSession().getMethodRegistry();
+            ExchangeDeclareBody body =
                 methodRegistry.createExchangeDeclareBody(getSession().getTicket(),
                                                          destination.getExchangeName(),
                                                          destination.getExchangeClass(),
                                                          destination.getExchangeName().toString().startsWith("amq."),
-                                                         false,
-                                                         false,
-                                                         false,
+                                                         destination.isExchangeDurable(),
+                                                         destination.isExchangeAutoDelete(),
+                                                         destination.isExchangeInternal(),
                                                          true,
                                                          null);
-        // Declare the exchange
-        // Note that the durable and internal arguments are ignored since passive is set to false
-
-        AMQFrame declare = body.generateFrame(getChannelId());
+            AMQFrame declare = body.generateFrame(getChannelId());
 
-        getProtocolHandler().writeFrame(declare);
+            getConnection().getProtocolHandler().writeFrame(declare);
+        }
     }
 
     void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
@@ -172,7 +171,7 @@ public class BasicMessageProducer_0_8 ex
             throw jmse;
         }
 
-        getProtocolHandler().writeFrame(compositeFrame);
+        getConnection().getProtocolHandler().writeFrame(compositeFrame);
     }
 
     /**
@@ -234,4 +233,9 @@ public class BasicMessageProducer_0_8 ex
         return frameCount;
     }
 
+    @Override
+    public AMQSession_0_8 getSession()
+    {
+        return (AMQSession_0_8) super.getSession();
+    }
 }

Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java Thu Feb 28 16:14:30 2013
@@ -88,7 +88,7 @@ public class XASessionImpl extends AMQSe
      */
     public void createSession()
     {
-        _qpidDtxSession = getQpidConnection().createSession(0);
+        _qpidDtxSession = getQpidConnection().createSession(0,true);
         _qpidDtxSession.setSessionListener(this);
         _qpidDtxSession.dtxSelect();
     }

Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java Thu Feb 28 16:14:30 2013
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.client.handler;
 
+import java.nio.ByteBuffer;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,6 +35,8 @@ import org.apache.qpid.framing.AMQShortS
 import org.apache.qpid.framing.ConnectionCloseBody;
 import org.apache.qpid.framing.ConnectionCloseOkBody;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.TransportException;
 
 public class ConnectionCloseMethodHandler implements StateAwareMethodListener<ConnectionCloseBody>
 {
@@ -91,18 +95,24 @@ public class ConnectionCloseMethodHandle
         }
         finally
         {
+            Sender<ByteBuffer> sender = session.getSender();
 
             if (error != null)
             {
                 session.notifyError(error);
-            }            
-
-            // Close the protocol Session, including any open TCP connections 
-            session.closeProtocolSession();
+            }
 
-            // Closing the session should not introduce a race condition as this thread will continue to propgate any
-            // exception in to the exceptionCaught method of the SessionHandler.
-            // Any sessionClosed event should occur after this.
+            // Close the open TCP connection
+            try
+            {
+                sender.close();
+            }
+            catch(TransportException e)
+            {
+                //Ignore, they are already logged by the Sender and this
+                //is a connection-close being processed by the IoReceiver
+                //which will as it closes initiate failover if necessary.
+            }
         }
     }
 

Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java Thu Feb 28 16:14:30 2013
@@ -91,6 +91,7 @@ public class AMQMessageDelegate_0_10 ext
 
     private MessageProperties _messageProps;
     private DeliveryProperties _deliveryProps;
+    private String _messageID;
 
     protected AMQMessageDelegate_0_10()
     {
@@ -171,8 +172,12 @@ public class AMQMessageDelegate_0_10 ext
 
     public String getJMSMessageID() throws JMSException
     {
-        UUID id = _messageProps.getMessageId();
-        return id == null ? null : "ID:" + id;
+        if (_messageID == null && _messageProps.getMessageId() != null)
+        {
+            UUID id = _messageProps.getMessageId();
+            _messageID = "ID:" + id;
+        }
+        return _messageID;
     }
 
     public void setJMSMessageID(String messageId) throws JMSException
@@ -185,14 +190,7 @@ public class AMQMessageDelegate_0_10 ext
         {
             if(messageId.startsWith("ID:"))
             {
-                try
-                {
-                    _messageProps.setMessageId(UUID.fromString(messageId.substring(3)));
-                }
-                catch(IllegalArgumentException ex)
-                {
-                    throw new JMSException("MessageId '"+messageId+"' is not of the correct format, it must be ID: followed by a UUID");
-                }
+                _messageID = messageId;
             }
             else
             {
@@ -201,6 +199,7 @@ public class AMQMessageDelegate_0_10 ext
         }
     }
 
+    /* Used by the internal implementation */
     public void setJMSMessageID(UUID messageId) throws JMSException
     {
         if(messageId == null)
@@ -344,7 +343,7 @@ public class AMQMessageDelegate_0_10 ext
                int type = ((AMQSession_0_10)getAMQSession()).resolveAddressType(amqd);
                if (type == AMQDestination.QUEUE_TYPE)
                {
-                   ((AMQSession_0_10)getAMQSession()).setLegacyFiledsForQueueType(amqd);
+                   ((AMQSession_0_10)getAMQSession()).setLegacyFieldsForQueueType(amqd);
                }
                else
                {

Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java Thu Feb 28 16:14:30 2013
@@ -196,7 +196,14 @@ public class JMSObjectMessage extends Ab
         if (data != null && data.hasRemaining())
         {
             ClassLoadingAwareObjectInputStream in = new ClassLoadingAwareObjectInputStream(new ByteBufferInputStream(data));
-            result = (Serializable) in.readObject();
+            try
+            {
+                result = (Serializable) in.readObject();
+            }
+            finally
+            {
+                in.close();
+            }
         }
         return result;
     }

Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java Thu Feb 28 16:14:30 2013
@@ -44,7 +44,12 @@ public class JMSStreamMessage extends Ab
 
     }
 
+    JMSStreamMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) throws AMQException
+    {
+        super(delegateFactory, data!=null);
+        _typedBytesContentWriter = new TypedBytesContentWriter();
 
+    }
 
     JMSStreamMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
     {

Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java Thu Feb 28 16:14:30 2013
@@ -66,6 +66,7 @@ public class MessageFactoryRegistry
         mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory());
         mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory());
         mf.registerFactory(AMQPEncodedMapMessage.MIME_TYPE, new AMQPEncodedMapMessageFactory());
+        mf.registerFactory(AMQPEncodedListMessage.MIME_TYPE, new AMQPEncodedListMessageFactory());
         mf.registerFactory(null, mf._default);
 
         return mf;

Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java Thu Feb 28 16:14:30 2013
@@ -20,21 +20,20 @@
  */
 package org.apache.qpid.client.messaging.address;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQDestination.Binding;
 import org.apache.qpid.client.messaging.address.Link.Reliability;
 import org.apache.qpid.client.messaging.address.Link.Subscription;
-import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
-import org.apache.qpid.client.messaging.address.Node.QueueNode;
+import org.apache.qpid.client.messaging.address.Link.SubscriptionQueue;
 import org.apache.qpid.configuration.Accessor;
 import org.apache.qpid.configuration.Accessor.MapAccessor;
 import org.apache.qpid.messaging.Address;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
 /**
  * Utility class for extracting information from the address class
  */
@@ -68,58 +67,56 @@ public class AddressHelper
     public static final String ARGUMENTS = "arguments";
     public static final String RELIABILITY = "reliability";
 
-    private Address address;
-    private Accessor addressProps;
-    private Accessor nodeProps;
-    private Accessor linkProps;
+    private Address _address;
+    private Accessor _addressPropAccess;
+    private Accessor _nodePropAccess;
+    private Accessor _linkPropAccess;
+    private Map _addressPropMap;
+    private Map _nodePropMap;
+    private Map _linkPropMap;
 
     public AddressHelper(Address address)
     {
-        this.address = address;
-        addressProps = new MapAccessor(address.getOptions());
-        Map node_props = address.getOptions() == null
+        this._address = address;
+        this._addressPropMap = address.getOptions();
+        this._addressPropAccess = new MapAccessor(_addressPropMap);
+        this._nodePropMap = address.getOptions() == null
                 || address.getOptions().get(NODE) == null ? null
                 : (Map) address.getOptions().get(NODE);
 
-        if (node_props != null)
+        if (_nodePropMap != null)
         {
-            nodeProps = new MapAccessor(node_props);
+            _nodePropAccess = new MapAccessor(_nodePropMap);
         }
 
-        Map link_props = address.getOptions() == null
+        this._linkPropMap = address.getOptions() == null
                 || address.getOptions().get(LINK) == null ? null
                 : (Map) address.getOptions().get(LINK);
 
-        if (link_props != null)
+        if (_linkPropMap != null)
         {
-            linkProps = new MapAccessor(link_props);
+            _linkPropAccess = new MapAccessor(_linkPropMap);
         }
     }
 
     public String getCreate()
     {
-        return addressProps.getString(CREATE);
+        return _addressPropAccess.getString(CREATE);
     }
 
     public String getAssert()
     {
-        return addressProps.getString(ASSERT);
+        return _addressPropAccess.getString(ASSERT);
     }
 
     public String getDelete()
     {
-        return addressProps.getString(DELETE);
-    }
-
-    public boolean isNoLocal()
-    {
-        Boolean b = nodeProps.getBoolean(NO_LOCAL);
-        return b == null ? false : b;
+        return _addressPropAccess.getString(DELETE);
     }
 
     public boolean isBrowseOnly()
     {
-        String mode = addressProps.getString(MODE);
+        String mode = _addressPropAccess.getString(MODE);
         return mode != null && mode.equals(BROWSE) ? true : false;
     }
 
@@ -127,7 +124,7 @@ public class AddressHelper
     public List<Binding> getBindings(Map props)
     {
         List<Binding> bindings = new ArrayList<Binding>();
-        List<Map> bindingList = (List<Map>) props.get(X_BINDINGS);
+        List<Map> bindingList = (props == null) ? Collections.EMPTY_LIST : (List<Map>) props.get(X_BINDINGS);
         if (bindingList != null)
         {
             for (Map bindingMap : bindingList)
@@ -157,117 +154,70 @@ public class AddressHelper
         }
     }
 
-    public int getTargetNodeType() throws Exception
+    public int getNodeType() throws Exception
     {
-        if (nodeProps == null || nodeProps.getString(TYPE) == null)
+        if (_nodePropAccess == null || _nodePropAccess.getString(TYPE) == null)
         {
             // need to query and figure out
             return AMQDestination.UNKNOWN_TYPE;
-        } else if (nodeProps.getString(TYPE).equals("queue"))
+        }
+        else if (_nodePropAccess.getString(TYPE).equals("queue"))
         {
             return AMQDestination.QUEUE_TYPE;
-        } else if (nodeProps.getString(TYPE).equals("topic"))
+        }
+        else if (_nodePropAccess.getString(TYPE).equals("topic"))
         {
             return AMQDestination.TOPIC_TYPE;
-        } else
+        }
+        else
         {
             throw new Exception("unkown exchange type");
         }
     }
 
-    public Node getTargetNode(int addressType)
+    public Node getNode()
     {
-        // target node here is the default exchange
-        if (nodeProps == null || addressType == AMQDestination.QUEUE_TYPE)
-        {
-            return new ExchangeNode();
-        } else if (addressType == AMQDestination.TOPIC_TYPE)
-        {
-            Map node = (Map) address.getOptions().get(NODE);
-            return createExchangeNode(node);
-        } else
+        Node node = new Node(_address.getName());
+        if (_nodePropAccess != null)
         {
-            // don't know yet
-            return null;
-        }
-    }
-
-    private Node createExchangeNode(Map parent)
-    {
-        Map declareArgs = getDeclareArgs(parent);
-        MapAccessor argsMap = new MapAccessor(declareArgs);
-        ExchangeNode node = new ExchangeNode();
-        node.setExchangeType(argsMap.getString(TYPE) == null ? null : argsMap
-                .getString(TYPE));
-        fillInCommonNodeArgs(node, parent, argsMap);
-        return node;
-    }
+            Map xDeclareMap = getDeclareArgs(_nodePropMap);
+            MapAccessor xDeclareMapAccessor = new MapAccessor(xDeclareMap);
 
-    private Node createQueueNode(Map parent)
-    {
-        Map declareArgs = getDeclareArgs(parent);
-        MapAccessor argsMap = new MapAccessor(declareArgs);
-        QueueNode node = new QueueNode();
-        node.setAlternateExchange(argsMap.getString(ALT_EXCHANGE));
-        node.setExclusive(argsMap.getBoolean(EXCLUSIVE) == null ? false
-                : argsMap.getBoolean(EXCLUSIVE));
-        fillInCommonNodeArgs(node, parent, argsMap);
-
-        return node;
-    }
-
-    private void fillInCommonNodeArgs(Node node, Map parent, MapAccessor argsMap)
-    {
-        node.setDurable(getDurability(parent));
-        node.setAutoDelete(argsMap.getBoolean(AUTO_DELETE) == null ? false
-                : argsMap.getBoolean(AUTO_DELETE));
-        node.setAlternateExchange(argsMap.getString(ALT_EXCHANGE));
-        node.setBindings(getBindings(parent));
-        if (getDeclareArgs(parent).containsKey(ARGUMENTS))
-        {
-            node.setDeclareArgs((Map<String,Object>)getDeclareArgs(parent).get(ARGUMENTS));
+            node.setDurable(getBooleanProperty(_nodePropAccess,DURABLE,false));
+            node.setAutoDelete(getBooleanProperty(xDeclareMapAccessor,AUTO_DELETE,false));
+            node.setExclusive(getBooleanProperty(xDeclareMapAccessor,EXCLUSIVE,false));
+            node.setAlternateExchange(xDeclareMapAccessor.getString(ALT_EXCHANGE));
+            if (xDeclareMapAccessor.getString(TYPE) != null)
+            {
+                node.setExchangeType(xDeclareMapAccessor.getString(TYPE));
+            }
+            node.setBindings(getBindings(_nodePropMap));
+            if (!xDeclareMap.isEmpty() && xDeclareMap.containsKey(ARGUMENTS))
+            {
+                node.setDeclareArgs((Map<String,Object>)xDeclareMap.get(ARGUMENTS));
+            }
         }
-    }
-    
-    private boolean getDurability(Map map)
-    {
-        Accessor access = new MapAccessor(map);
-        Boolean result = access.getBoolean(DURABLE);
-        return (result == null) ? false : result.booleanValue();
+        return node;
     }
 
-    /**
-     * if the type == queue x-declare args from the node props is used. if the
-     * type == exchange x-declare args from the link props is used else just
-     * create a default temp queue.
-     */
-    public Node getSourceNode(int addressType)
+    // This should really be in the Accessor interface
+    private boolean getBooleanProperty(Accessor access, String propName, boolean defaultValue)
     {
-        if (addressType == AMQDestination.QUEUE_TYPE && nodeProps != null)
-        {
-            return createQueueNode((Map) address.getOptions().get(NODE));
-        }
-        if (addressType == AMQDestination.TOPIC_TYPE && linkProps != null)
-        {
-            return createQueueNode((Map) address.getOptions().get(LINK));
-        } else
-        {
-            // need to query the info
-            return new QueueNode();
-        }
+        Boolean result = access.getBoolean(propName);
+        return (result == null) ? defaultValue : result.booleanValue();
     }
 
     public Link getLink() throws Exception
     {
         Link link = new Link();
         link.setSubscription(new Subscription());
-        if (linkProps != null)
+        link.setSubscriptionQueue(new SubscriptionQueue());
+        if (_linkPropAccess != null)
         {
-            link.setDurable(linkProps.getBoolean(DURABLE) == null ? false
-                    : linkProps.getBoolean(DURABLE));
-            link.setName(linkProps.getString(NAME));
+            link.setDurable(getBooleanProperty(_linkPropAccess,DURABLE,false));
+            link.setName(_linkPropAccess.getString(NAME));
 
-            String reliability = linkProps.getString(RELIABILITY);
+            String reliability = _linkPropAccess.getString(RELIABILITY);
             if ( reliability != null)
             {
                 if (reliability.equalsIgnoreCase("unreliable"))
@@ -283,13 +233,12 @@ public class AddressHelper
                     throw new Exception("The reliability mode '" + 
                             reliability + "' is not yet supported");
                 }
-                
             }
             
-            if (((Map) address.getOptions().get(LINK)).get(CAPACITY) instanceof Map)
+            if (((Map) _address.getOptions().get(LINK)).get(CAPACITY) instanceof Map)
             {
                 MapAccessor capacityProps = new MapAccessor(
-                        (Map) ((Map) address.getOptions().get(LINK))
+                        (Map) ((Map) _address.getOptions().get(LINK))
                                 .get(CAPACITY));
                 link
                         .setConsumerCapacity(capacityProps
@@ -302,17 +251,19 @@ public class AddressHelper
             } 
             else
             {
-                int cap = linkProps.getInt(CAPACITY) == null ? 0 : linkProps
+                int cap = _linkPropAccess.getInt(CAPACITY) == null ? 0 : _linkPropAccess
                         .getInt(CAPACITY);
                 link.setConsumerCapacity(cap);
                 link.setProducerCapacity(cap);
             }
-            link.setFilter(linkProps.getString(FILTER));
+            link.setFilter(_linkPropAccess.getString(FILTER));
             // so far filter type not used
             
-            if (((Map) address.getOptions().get(LINK)).containsKey(X_SUBSCRIBE))
+            Map linkMap = (Map) _address.getOptions().get(LINK);
+
+            if (linkMap != null && linkMap.containsKey(X_SUBSCRIBE))
             {   
-                Map x_subscribe = (Map)((Map) address.getOptions().get(LINK)).get(X_SUBSCRIBE);
+                Map x_subscribe = (Map)((Map) _address.getOptions().get(LINK)).get(X_SUBSCRIBE);
                 
                 if (x_subscribe.containsKey(ARGUMENTS))
                 {
@@ -324,6 +275,18 @@ public class AddressHelper
                 
                 link.getSubscription().setExclusive(exclusive);
             }
+
+            link.setBindings(getBindings(linkMap));
+            Map xDeclareMap = getDeclareArgs(linkMap);
+            SubscriptionQueue queue = link.getSubscriptionQueue();
+            if (!xDeclareMap.isEmpty() && xDeclareMap.containsKey(ARGUMENTS))
+            {
+                MapAccessor xDeclareMapAccessor = new MapAccessor(xDeclareMap);
+                queue.setAutoDelete(getBooleanProperty(xDeclareMapAccessor,AUTO_DELETE,true));
+                queue.setExclusive(getBooleanProperty(xDeclareMapAccessor,EXCLUSIVE,true));
+                queue.setAlternateExchange(xDeclareMapAccessor.getString(ALT_EXCHANGE));
+                queue.setDeclareArgs((Map<String,Object>)xDeclareMap.get(ARGUMENTS));
+            }
         }
 
         return link;

Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java Thu Feb 28 16:14:30 2013
@@ -20,9 +20,14 @@
  */
 package org.apache.qpid.client.messaging.address;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import org.apache.qpid.client.AMQDestination.Binding;
+
 public class Link
 { 
     public enum FilterType { SQL92, XQUERY, SUBJECT }
@@ -36,10 +41,11 @@ public class Link
     private boolean _isDurable;
     private int _consumerCapacity = 0;
     private int _producerCapacity = 0;
-    private Node node;
     private Subscription subscription;
     private Reliability reliability = Reliability.AT_LEAST_ONCE;
-    
+    private List<Binding> _bindings = new ArrayList<Binding>();
+    private SubscriptionQueue _subscriptionQueue;
+
     public Reliability getReliability()
     {
         return reliability;
@@ -50,21 +56,11 @@ public class Link
         this.reliability = reliability;
     }
 
-    public Node getNode()
-    {
-        return node;
-    }
-
-    public void setNode(Node node)
-    {
-        this.node = node;
-    }
-
     public boolean isDurable()
     {
         return _isDurable;
     }
-    
+
     public void setDurable(boolean durable)
     {
         _isDurable = durable;
@@ -139,6 +135,74 @@ public class Link
     {
         this.subscription = subscription;
     }   
+
+    public List<Binding> getBindings()
+    {
+        return _bindings;
+    }
+
+    public void setBindings(List<Binding> bindings)
+    {
+        _bindings = bindings;
+    }
+
+    public SubscriptionQueue getSubscriptionQueue()
+    {
+        return _subscriptionQueue;
+    }
+
+    public void setSubscriptionQueue(SubscriptionQueue subscriptionQueue)
+    {
+        this._subscriptionQueue = subscriptionQueue;
+    }
+
+    public static class SubscriptionQueue
+    {
+        private Map<String,Object> _declareArgs = new HashMap<String,Object>();
+        private boolean _isAutoDelete = true;
+        private boolean _isExclusive = true;
+        private String _alternateExchange;
+
+        public Map<String,Object> getDeclareArgs()
+        {
+            return _declareArgs;
+        }
+
+        public void setDeclareArgs(Map<String,Object> options)
+        {
+            _declareArgs = options;
+        }
+
+        public boolean isAutoDelete()
+        {
+            return _isAutoDelete;
+        }
+
+        public void setAutoDelete(boolean autoDelete)
+        {
+            _isAutoDelete = autoDelete;
+        }
+
+        public boolean isExclusive()
+        {
+            return _isExclusive;
+        }
+
+        public void setExclusive(boolean exclusive)
+        {
+            _isExclusive = exclusive;
+        }
+
+        public String getAlternateExchange()
+        {
+            return _alternateExchange;
+        }
+
+        public void setAlternateExchange(String altExchange)
+        {
+            _alternateExchange = altExchange;
+        }
+    }
     
     public static class Subscription
     {

Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java Thu Feb 28 16:14:30 2013
@@ -26,19 +26,33 @@ import org.apache.qpid.client.AMQDestina
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public abstract class Node
+public class Node
 { 
     private int _nodeType = AMQDestination.UNKNOWN_TYPE;
+    private String _name;
     private boolean _isDurable;
     private boolean _isAutoDelete;
+    private boolean _isExclusive;
     private String _alternateExchange;
+    private String _exchangeType = "topic"; // used when node is an exchange instead of a queue.
     private List<Binding> _bindings = new ArrayList<Binding>();
-    private Map<String,Object> _declareArgs = Collections.emptyMap();
+    private Map<String,Object> _declareArgs = new HashMap<String,Object>();
 
-    protected Node(int nodeType)
+    protected Node(String name)
+    {
+        _name = name;
+    }
+
+    public String getName()
+    {
+        return _name;
+    }
+
+    public void setNodeType(int nodeType)
     {
         _nodeType = nodeType;
     }
@@ -58,6 +72,16 @@ public abstract class Node
         _isDurable = durable;
     }
 
+    public boolean isExclusive()
+    {
+        return _isExclusive;
+    }
+
+    public void setExclusive(boolean exclusive)
+    {
+        _isExclusive = exclusive;
+    }
+
     public boolean isAutoDelete()
     {
         return _isAutoDelete;
@@ -100,56 +124,15 @@ public abstract class Node
     public void setDeclareArgs(Map<String,Object> options)
     {
         _declareArgs = options;
-    }   
-    
-    public static class QueueNode extends Node 
+    }
+
+    public void setExchangeType(String type)
     {
-       private boolean _isExclusive;
-       private QpidQueueOptions _queueOptions = new QpidQueueOptions();
-       
-       public QueueNode()
-       {
-           super(AMQDestination.QUEUE_TYPE);
-       }
-       
-       public boolean isExclusive()
-       {
-           return _isExclusive;
-       }
-       
-       public void setExclusive(boolean exclusive)
-       {
-           _isExclusive = exclusive;
-       }  
-    }
-    
-    public static class ExchangeNode extends Node 
-    {
-       private QpidExchangeOptions _exchangeOptions = new QpidExchangeOptions();
-       private String _exchangeType;
-       
-       public ExchangeNode()
-       {
-           super(AMQDestination.TOPIC_TYPE);
-       }
-       
-       public String getExchangeType()
-       {
-           return _exchangeType;
-       }
-       
-       public void setExchangeType(String exchangeType)
-       {
-           _exchangeType = exchangeType;
-       }
-    
+        _exchangeType = type;
     }
-    
-    public static class UnknownNodeType extends Node 
-    {
-        public UnknownNodeType()
-        {
-            super(AMQDestination.UNKNOWN_TYPE);
-        }
+
+    public String getExchangeType()
+    {
+        return _exchangeType;
     }
 }

Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Thu Feb 28 16:14:30 2013
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.client.protocol;
 
+import org.apache.qpid.client.HeartbeatListener;
 import org.apache.qpid.util.BytesDataOutput;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,6 +57,7 @@ import org.apache.qpid.protocol.AMQMetho
 import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.thread.Threading;
 import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.transport.network.NetworkConnection;
 
 import java.io.IOException;
@@ -177,6 +179,9 @@ public class AMQProtocolHandler implemen
 
     private NetworkConnection _network;
     private Sender<ByteBuffer> _sender;
+    private long _lastReadTime = System.currentTimeMillis();
+    private long _lastWriteTime = System.currentTimeMillis();
+    private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT;
 
     /**
      * Creates a new protocol handler, associated with the specified client connection instance.
@@ -210,48 +215,67 @@ public class AMQProtocolHandler implemen
         }
         else
         {
-            _logger.debug("Session closed called with failover state currently " + _failoverState);
-
-            // reconnetablility was introduced here so as not to disturb the client as they have made their intentions
-            // known through the policy settings.
-
-            if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.failoverAllowed())
+            // Use local variable to keep flag whether fail-over allowed or not,
+            // in order to execute AMQConnection#exceptionRecievedout out of synchronization block,
+            // otherwise it might deadlock with failover mutex
+            boolean failoverNotAllowed = false;
+            synchronized (this)
             {
-                _logger.debug("FAILOVER STARTING");
-                if (_failoverState == FailoverState.NOT_STARTED)
-                {
-                    _failoverState = FailoverState.IN_PROGRESS;
-                    startFailoverThread();
-                }
-                else
-                {
-                    _logger.debug("Not starting failover as state currently " + _failoverState);
-                }
-            }
-            else
-            {
-                _logger.debug("Failover not allowed by policy."); // or already in progress?
-
                 if (_logger.isDebugEnabled())
                 {
-                    _logger.debug(_connection.getFailoverPolicy().toString());
+                    _logger.debug("Session closed called with failover state " + _failoverState);
                 }
 
-                if (_failoverState != FailoverState.IN_PROGRESS)
+                // reconnetablility was introduced here so as not to disturb the client as they have made their intentions
+                // known through the policy settings.
+                if (_failoverState == FailoverState.NOT_STARTED)
                 {
-                    _logger.debug("sessionClose() not allowed to failover");
-                    _connection.exceptionReceived(new AMQDisconnectedException(
-                            "Server closed connection and reconnection " + "not permitted.",
-                            _stateManager.getLastException()));
+                    // close the sender
+                    try
+                    {
+                        _sender.close();
+                    }
+                    catch (Exception e)
+                    {
+                        _logger.warn("Exception occured on closing the sender", e);
+                    }
+                    if (_connection.failoverAllowed())
+                    {
+                        _failoverState = FailoverState.IN_PROGRESS;
+
+                        _logger.debug("FAILOVER STARTING");
+                        startFailoverThread();
+                    }
+                    else if (_connection.isConnected())
+                    {
+                        failoverNotAllowed = true;
+                        if (_logger.isDebugEnabled())
+                        {
+                            _logger.debug("Failover not allowed by policy:" + _connection.getFailoverPolicy());
+                        }
+                    }
+                    else
+                    {
+                        _logger.debug("We are in process of establishing the initial connection");
+                    }
                 }
                 else
                 {
-                    _logger.debug("sessionClose() failover in progress");
+                    _logger.debug("Not starting the failover thread as state currently " + _failoverState);
                 }
             }
+
+            if (failoverNotAllowed)
+            {
+                _connection.exceptionReceived(new AMQDisconnectedException(
+                        "Server closed connection and reconnection not permitted.", _stateManager.getLastException()));
+            }
         }
 
-        _logger.debug("Protocol Session [" + this + "] closed");
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Protocol Session [" + this + "] closed");
+        }
     }
 
     /** See {@link FailoverHandler} to see rationale for separate thread. */
@@ -280,7 +304,6 @@ public class AMQProtocolHandler implemen
     {
         _logger.debug("Protocol Session [" + this + "] idle: reader");
         //  failover:
-        HeartbeatDiagnostics.timeout();
         _logger.warn("Timed out while waiting for heartbeat from peer.");
         _network.close();
     }
@@ -289,7 +312,7 @@ public class AMQProtocolHandler implemen
     {
         _logger.debug("Protocol Session [" + this + "] idle: reader");
         writeFrame(HeartbeatBody.FRAME);
-        HeartbeatDiagnostics.sent();
+        _heartbeatListener.heartbeatSent();
     }
 
     /**
@@ -297,14 +320,29 @@ public class AMQProtocolHandler implemen
      */
     public void exception(Throwable cause)
     {
-        if (_failoverState == FailoverState.NOT_STARTED)
+        boolean causeIsAConnectionProblem =
+                cause instanceof AMQConnectionClosedException ||
+                cause instanceof IOException ||
+                cause instanceof TransportException;
+
+        if (causeIsAConnectionProblem)
         {
-            if ((cause instanceof AMQConnectionClosedException) || cause instanceof IOException)
+            //ensure the IoSender and IoReceiver are closed
+            try
             {
-                _logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
-                // this will attempt failover
                 _network.close();
-                closed();
+            }
+            catch (Exception e)
+            {
+                //ignore
+            }
+        }
+        FailoverState state = getFailoverState();
+        if (state == FailoverState.NOT_STARTED)
+        {
+            if (causeIsAConnectionProblem)
+            {
+                _logger.info("Connection exception caught therefore going to attempt failover: " + cause, cause);
             }
             else
             {
@@ -319,7 +357,7 @@ public class AMQProtocolHandler implemen
         }
         // we reach this point if failover was attempted and failed therefore we need to let the calling app
         // know since we cannot recover the situation
-        else if (_failoverState == FailoverState.FAILED)
+        else if (state == FailoverState.FAILED)
         {
             _logger.error("Exception caught by protocol handler: " + cause, cause);
 
@@ -329,6 +367,10 @@ public class AMQProtocolHandler implemen
             propagateExceptionToAllWaiters(amqe);
             _connection.exceptionReceived(cause);
         }
+        else
+        {
+            _logger.warn("Exception caught by protocol handler: " + cause, cause);
+        }
     }
 
     /**
@@ -403,6 +445,7 @@ public class AMQProtocolHandler implemen
     public void received(ByteBuffer msg)
     {
         _readBytes += msg.remaining();
+        _lastReadTime = System.currentTimeMillis();
         try
         {
             final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
@@ -431,8 +474,6 @@ public class AMQProtocolHandler implemen
 
                         final AMQBody bodyFrame = frame.getBodyFrame();
 
-                        HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
-
                         bodyFrame.handle(frame.getChannel(), _protocolSession);
 
                         _connection.bytesReceived(_readBytes);
@@ -521,6 +562,7 @@ public class AMQProtocolHandler implemen
     public  synchronized void writeFrame(AMQDataBlock frame, boolean flush)
     {
         final ByteBuffer buf = asByteBuffer(frame);
+        _lastWriteTime = System.currentTimeMillis();
         _writtenBytes += buf.remaining();
         _sender.send(buf);
         if(flush)
@@ -792,14 +834,14 @@ public class AMQProtocolHandler implemen
         return _protocolSession;
     }
 
-    FailoverState getFailoverState()
+    synchronized FailoverState getFailoverState()
     {
         return _failoverState;
     }
 
-    public void setFailoverState(FailoverState failoverState)
+    public synchronized void setFailoverState(FailoverState failoverState)
     {
-        _failoverState = failoverState;
+        _failoverState= failoverState;
     }
 
     public byte getProtocolMajorVersion()
@@ -843,6 +885,23 @@ public class AMQProtocolHandler implemen
         _sender = sender;
     }
 
+    @Override
+    public long getLastReadTime()
+    {
+        return _lastReadTime;
+    }
+
+    @Override
+    public long getLastWriteTime()
+    {
+        return _lastWriteTime;
+    }
+
+    protected Sender<ByteBuffer> getSender()
+    {
+        return _sender;
+    }
+
     /** @param delay delay in seconds (not ms) */
     void initHeartbeats(int delay)
     {
@@ -850,7 +909,6 @@ public class AMQProtocolHandler implemen
         {
             _network.setMaxWriteIdle(delay);
             _network.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
-            HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
         }
     }
 
@@ -865,5 +923,13 @@ public class AMQProtocolHandler implemen
     }
 
 
+    public void setHeartbeatListener(HeartbeatListener listener)
+    {
+        _heartbeatListener = listener == null ? HeartbeatListener.DEFAULT : listener;
+    }
 
+    public void heartbeatBodyReceived()
+    {
+        _heartbeatListener.heartbeatReceived();
+    }
 }

Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Thu Feb 28 16:14:30 2013
@@ -48,6 +48,8 @@ import org.apache.qpid.transport.Transpo
 
 import javax.jms.JMSException;
 import javax.security.sasl.SaslClient;
+
+import java.nio.ByteBuffer;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -265,7 +267,7 @@ public class AMQProtocolSession implemen
 
     public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException
     {
-
+        _protocolHandler.heartbeatBodyReceived();
     }
 
     /**
@@ -372,6 +374,11 @@ public class AMQProtocolSession implemen
         }
     }
 
+    public Sender<ByteBuffer> getSender()
+    {
+        return _protocolHandler.getSender();
+    }
+
     public void failover(String host, int port)
     {
         _protocolHandler.failover(host, port);

Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.java Thu Feb 28 16:14:30 2013
@@ -28,8 +28,10 @@ import org.apache.qpid.util.FileUtils;
 import javax.security.sasl.SaslClientFactory;
 import java.io.IOException;
 import java.io.InputStream;
+import java.security.Provider;
 import java.security.Security;
 import java.util.Enumeration;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 import java.util.TreeMap;
@@ -67,10 +69,10 @@ public class DynamicSaslRegistrar
     }
 
     /** Reads the properties file, and creates a dynamic security provider to register the SASL implementations with. */
-    public static void registerSaslProviders()
+    public static ProviderRegistrationResult registerSaslProviders()
     {
         _logger.debug("public static void registerSaslProviders(): called");
-
+        ProviderRegistrationResult result = ProviderRegistrationResult.FAILED;
         // Open the SASL properties file, using the default name is one is not specified.
         String filename = System.getProperty(FILE_PROPERTY);
         InputStream is =
@@ -89,22 +91,45 @@ public class DynamicSaslRegistrar
             if (factories.size() > 0)
             {
                 // Ensure we are used before the defaults
-                if (Security.insertProviderAt(new JCAProvider(factories), 1) == -1)
+                JCAProvider qpidProvider = new JCAProvider(factories);
+                if (Security.insertProviderAt(qpidProvider, 1) == -1)
                 {
-                    _logger.error("Unable to load custom SASL providers.");
+                    Provider registeredProvider = findProvider(JCAProvider.QPID_CLIENT_SASL_PROVIDER_NAME);
+                    if (registeredProvider == null)
+                    {
+                        result = ProviderRegistrationResult.FAILED;
+                        _logger.error("Unable to load custom SASL providers.");
+                    }
+                    else if (registeredProvider.equals(qpidProvider))
+                    {
+                        result = ProviderRegistrationResult.EQUAL_ALREADY_REGISTERED;
+                        _logger.debug("Custom SASL provider is already registered with equal properties.");
+                    }
+                    else
+                    {
+                        result = ProviderRegistrationResult.DIFFERENT_ALREADY_REGISTERED;
+                        _logger.warn("Custom SASL provider was already registered with different properties.");
+                        if (_logger.isDebugEnabled())
+                        {
+                            _logger.debug("Custom SASL provider " + registeredProvider + " properties: " + new HashMap<Object, Object>(registeredProvider));
+                        }
+                    }
                 }
                 else
                 {
+                    result = ProviderRegistrationResult.SUCCEEDED;
                     _logger.info("Additional SASL providers successfully registered.");
                 }
             }
             else
             {
-                _logger.warn("No additional SASL providers registered.");
+                result = ProviderRegistrationResult.NO_SASL_FACTORIES;
+                _logger.warn("No additional SASL factories found to register.");
             }
         }
         catch (IOException e)
         {
+            result = ProviderRegistrationResult.FAILED;
             _logger.error("Error reading properties: " + e, e);
         }
         finally
@@ -122,6 +147,22 @@ public class DynamicSaslRegistrar
                 }
             }
         }
+        return result;
+    }
+
+    static Provider findProvider(String name)
+    {
+        Provider[] providers = Security.getProviders();
+        Provider registeredProvider = null;
+        for (Provider provider : providers)
+        {
+            if (name.equals(provider.getName()))
+            {
+                registeredProvider = provider;
+                break;
+            }
+        }
+        return registeredProvider;
     }
 
     /**
@@ -158,15 +199,24 @@ public class DynamicSaslRegistrar
                     continue;
                 }
 
-                _logger.debug("Registering class "+ clazz.getName() +" for mechanism "+mechanism);
+                _logger.debug("Found class "+ clazz.getName() +" for mechanism "+mechanism);
                 factoriesToRegister.put(mechanism, (Class<? extends SaslClientFactory>) clazz);
             }
             catch (Exception ex)
             {
-                _logger.error("Error instantiating SaslClientFactory calss " + className + " - skipping");
+                _logger.error("Error instantiating SaslClientFactory class " + className + " - skipping");
             }
         }
 
         return factoriesToRegister;
     }
+
+    public static enum ProviderRegistrationResult
+    {
+        SUCCEEDED,
+        EQUAL_ALREADY_REGISTERED,
+        DIFFERENT_ALREADY_REGISTERED,
+        NO_SASL_FACTORIES,
+        FAILED;
+    }
 }

Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/security/JCAProvider.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/security/JCAProvider.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/security/JCAProvider.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/security/JCAProvider.java Thu Feb 28 16:14:30 2013
@@ -39,6 +39,11 @@ import java.util.Map;
  */
 public class JCAProvider extends Provider
 {
+    static final String QPID_CLIENT_SASL_PROVIDER_NAME = "AMQSASLProvider-Client";
+    static final String QPID_CLIENT_SASL_PROVIDER_INFO = "A JCA provider that registers all "
+                                                       + "AMQ SASL providers that want to be registered";
+    static final double QPID_CLIENT_SASL_PROVIDER_VERSION = 1.0;
+
     private static final Logger log = LoggerFactory.getLogger(JCAProvider.class);
 
     /**
@@ -48,8 +53,7 @@ public class JCAProvider extends Provide
      */
     public JCAProvider(Map<String, Class<? extends SaslClientFactory>> providerMap)
     {
-        super("AMQSASLProvider-Client", 1.0, "A JCA provider that registers all "
-            + "AMQ SASL providers that want to be registered");
+        super(QPID_CLIENT_SASL_PROVIDER_NAME, QPID_CLIENT_SASL_PROVIDER_VERSION, QPID_CLIENT_SASL_PROVIDER_INFO);
         register(providerMap);
     }
 
@@ -63,7 +67,7 @@ public class JCAProvider extends Provide
         for (Map.Entry<String, Class<? extends SaslClientFactory>> me : providerMap.entrySet())
         {
             put( "SaslClientFactory."+me.getKey(), me.getValue().getName());
-            log.debug("Registered SASL Client factory for " + me.getKey() + " as " + me.getValue().getName());
+            log.debug("Recording SASL Client factory for " + me.getKey() + " as " + me.getValue().getName());
         }
     }
 }

Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Thu Feb 28 16:14:30 2013
@@ -157,12 +157,15 @@ public class AMQStateManager implements 
 
         if (_waiters.size() == 0)
         {
-            _logger.error("No Waiters for error saving as last error:" + error.getMessage());
+            _logger.info("No Waiters for error. Saving as last error:" + error.getMessage());
             _lastException = error;
         }
         for (StateWaiter waiter : _waiters)
         {
-            _logger.error("Notifying Waiters(" + _waiters + ") for error:" + error.getMessage());
+            if(_logger.isDebugEnabled())
+            {
+                _logger.debug("Notifying waiter " + waiter + " for error:" + error.getMessage());
+            }
             waiter.error(error);
         }
     }

Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java Thu Feb 28 16:14:30 2013
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.client.transport;
 
+import org.apache.qpid.client.HeartbeatListener;
+import org.apache.qpid.transport.ConnectionHeartbeat;
 import org.ietf.jgss.GSSContext;
 import org.ietf.jgss.GSSException;
 import org.ietf.jgss.GSSManager;
@@ -70,6 +72,7 @@ public class ClientConnectionDelegate ex
     }
 
     private final ConnectionURL _connectionURL;
+    private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT;
 
     /**
      * @param settings
@@ -165,4 +168,19 @@ public class ClientConnectionDelegate ex
 
         return null;
     }
+
+    @Override
+    public void connectionHeartbeat(Connection conn, ConnectionHeartbeat hearbeat)
+    {
+        // ClientDelegate simply responds to heartbeats with heartbeats
+        _heartbeatListener.heartbeatReceived();
+        super.connectionHeartbeat(conn, hearbeat);
+        _heartbeatListener.heartbeatSent();
+    }
+
+
+    public void setHeartbeatListener(HeartbeatListener listener)
+    {
+        _heartbeatListener = listener == null ? HeartbeatListener.DEFAULT : listener;
+    }
 }

Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java Thu Feb 28 16:14:30 2013
@@ -20,9 +20,8 @@
  */
 package org.apache.qpid.jms;
 
-import org.apache.qpid.framing.AMQShortString;
-
 import java.util.List;
+import org.apache.qpid.framing.AMQShortString;
 
 /**
  Connection URL format
@@ -35,14 +34,22 @@ public interface ConnectionURL
     public static final String AMQ_PROTOCOL = "amqp";
     public static final String OPTIONS_SYNC_PERSISTENCE = "sync_persistence";
     public static final String OPTIONS_MAXPREFETCH = "maxprefetch";
-    public static final String OPTIONS_SYNC_ACK = "sync_ack";    
+    public static final String OPTIONS_SYNC_ACK = "sync_ack";
     public static final String OPTIONS_SYNC_PUBLISH = "sync_publish";
     public static final String OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT = "use_legacy_map_msg_format";
+    public static final String OPTIONS_USE_LEGACY_STREAM_MESSAGE_FORMAT = "use_legacy_stream_msg_format";
     public static final String OPTIONS_BROKERLIST = "brokerlist";
     public static final String OPTIONS_FAILOVER = "failover";
     public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount";
 
     /**
+     * This option is used to apply a connection level override of
+     * the {@value BrokerDetails#OPTIONS_SSL} option values in the
+     * {@value ConnectionURL#OPTIONS_BROKERLIST};
+     */
+    public static final String OPTIONS_SSL = "ssl";
+
+    /**
      * This option is only applicable for 0-8/0-9/0-9-1 protocols connection
      * <p>
      * It tells the client to delegate the requeue/DLQ decision to the
@@ -54,9 +61,11 @@ public interface ConnectionURL
     public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE = "defaultQueueExchange";
     public static final String OPTIONS_TEMPORARY_TOPIC_EXCHANGE = "temporaryTopicExchange";
     public static final String OPTIONS_TEMPORARY_QUEUE_EXCHANGE = "temporaryQueueExchange";
+    public static final String OPTIONS_VERIFY_QUEUE_ON_SEND = "verifyQueueOnSend";
+
     public static final byte  URL_0_8 = 1;
     public static final byte  URL_0_10 = 2;
-    
+
     String getURL();
 
     String getFailoverMethod();

Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java Thu Feb 28 16:14:30 2013
@@ -23,25 +23,11 @@ package org.apache.qpid.jms;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
-import java.io.UnsupportedEncodingException;
 
 /**
  */
 public interface MessageProducer extends javax.jms.MessageProducer
 {
-    /**
-     * Set the default MIME type for messages produced by this producer. This reduces the overhead of each message.
-     * @param mimeType
-     */
-    void setMimeType(String mimeType) throws JMSException;
-
-    /**
-     * Set the default encoding for messages produced by this producer. This reduces the overhead of each message.
-     * @param encoding the encoding as understood by XXXX how do I specify this?? RG
-     * @throws UnsupportedEncodingException if the encoding is not understood
-     */
-    void setEncoding(String encoding) throws UnsupportedEncodingException, JMSException;
-    
     void send(Destination destination, Message message, int deliveryMode,
                      int priority, long timeToLive, boolean immediate)
             throws JMSException;

Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/jms/Session.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/jms/Session.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/jms/Session.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/jms/Session.java Thu Feb 28 16:14:30 2013
@@ -21,6 +21,7 @@
 package org.apache.qpid.jms;
 
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.jms.ListMessage;
 
 import javax.jms.Destination;
 import javax.jms.JMSException;
@@ -100,4 +101,6 @@ public interface Session extends TopicSe
     AMQShortString getDefaultTopicExchangeName();
 
     AMQShortString getTemporaryQueueExchangeName();
+
+    ListMessage createListMessage() throws JMSException;
 }

Modified: qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java (original)
+++ qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java Thu Feb 28 16:14:30 2013
@@ -20,25 +20,60 @@
  */
 package org.apache.qpid.client;
 
-import junit.framework.TestCase;
-
-import org.apache.qpid.AMQInvalidArgumentException;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
-import java.util.concurrent.atomic.AtomicReference;
 
-public class AMQConnectionUnitTest extends TestCase
+import org.apache.qpid.AMQInvalidArgumentException;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class AMQConnectionUnitTest extends QpidTestCase
 {
+    String _url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'";
+
+    public void testVerifyQueueOnSendDefault() throws Exception
+    {
+        MockAMQConnection connection = new MockAMQConnection(_url);
+        assertFalse(connection.validateQueueOnSend());
+    }
+
+    public void testVerifyQueueOnSendViaSystemProperty() throws Exception
+    {
+        setTestSystemProperty(ClientProperties.VERIFY_QUEUE_ON_SEND, "true");
+        MockAMQConnection connection = new MockAMQConnection(_url);
+        assertTrue(connection.validateQueueOnSend());
+
+        setTestSystemProperty(ClientProperties.VERIFY_QUEUE_ON_SEND, "false");
+        connection = new MockAMQConnection(_url);
+        assertFalse(connection.validateQueueOnSend());
+    }
+
+    public void testVerifyQueueOnSendViaURL() throws Exception
+    {
+        MockAMQConnection connection = new MockAMQConnection(_url + "&" +  ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND + "='true'");
+        assertTrue(connection.validateQueueOnSend());
+
+        connection = new MockAMQConnection(_url + "&" +  ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND + "='false'");
+        assertFalse(connection.validateQueueOnSend());
+    }
+
+    public void testVerifyQueueOnSendViaURLoverridesSystemProperty() throws Exception
+    {
+        setTestSystemProperty(ClientProperties.VERIFY_QUEUE_ON_SEND, "false");
+        MockAMQConnection connection = new MockAMQConnection(_url + "&" +  ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND + "='true'");
+        assertTrue(connection.validateQueueOnSend());
+    }
 
     public void testExceptionReceived()
     {
-        String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'";
         AMQInvalidArgumentException expectedException = new AMQInvalidArgumentException("Test", null);
         final AtomicReference<JMSException> receivedException = new AtomicReference<JMSException>();
         try
         {
-            MockAMQConnection connection = new MockAMQConnection(url);
+            MockAMQConnection connection = new MockAMQConnection(_url);
             connection.setExceptionListener(new ExceptionListener()
             {
 
@@ -62,4 +97,22 @@ public class AMQConnectionUnitTest exten
         assertEquals("JMSException linked exception is incorrect", expectedException, exception.getLinkedException());
     }
 
+    /**
+     * This should expand to test all the defaults.
+     */
+    public void testDefaultStreamMessageEncoding() throws Exception
+    {
+        MockAMQConnection connection = new MockAMQConnection(_url);
+        assertTrue("Legacy Stream message encoding should be the default",connection.isUseLegacyStreamMessageFormat());
+    }
+
+    /**
+     * This should expand to test all the connection properties.
+     */
+    public void testStreamMessageEncodingProperty() throws Exception
+    {
+        MockAMQConnection connection = new MockAMQConnection(_url + "&use_legacy_stream_msg_format='false'");
+        assertFalse("Stream message encoding should be amqp/list",connection.isUseLegacyStreamMessageFormat());
+    }
+
 }

Modified: qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java (original)
+++ qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java Thu Feb 28 16:14:30 2013
@@ -18,6 +18,7 @@
  */
 package org.apache.qpid.client;
 
+import org.apache.qpid.client.message.AMQPEncodedListMessage;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.transport.*;
@@ -28,6 +29,8 @@ import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
+import javax.jms.StreamMessage;
+
 import java.util.ArrayList;
 import java.util.List;
 
@@ -276,7 +279,7 @@ public class AMQSession_0_10Test extends
         {
             BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
                     null, null, false, true);
-            session.sendConsume(consumer, new AMQShortString("test"), null, true, 1);
+            session.sendConsume(consumer, new AMQShortString("test"), true, 1);
         }
         catch (Exception e)
         {
@@ -459,6 +462,13 @@ public class AMQSession_0_10Test extends
         assertNotNull("ExchangeDeclare event was not sent", event);
     }
 
+    public void testCreateStreamMessage() throws Exception
+    {
+        AMQSession_0_10 session = createAMQSession_0_10();
+        StreamMessage m = session.createStreamMessage();
+        assertTrue("Legacy Stream message encoding should be the default" + m.getClass(),!(m instanceof AMQPEncodedListMessage));
+    }
+
     public void testGetQueueDepthWithSync()
     {
         // slow down a flush thread
@@ -587,7 +597,7 @@ public class AMQSession_0_10Test extends
         connection.setSessionFactory(new SessionFactory()
         {
 
-            public Session newSession(Connection conn, Binary name, long expiry)
+            public Session newSession(Connection conn, Binary name, long expiry, boolean isNoReplay)
             {
                 return new MockSession(conn, new SessionDelegate(), name, expiry, throwException);
             }
@@ -660,7 +670,6 @@ public class AMQSession_0_10Test extends
             if (m instanceof ExchangeBound)
             {
                 ExchangeBoundResult struc = new ExchangeBoundResult();
-                struc.setQueueNotFound(true);
                 result.setValue(struc);
             }
             else if (m instanceof ExchangeQuery)

Modified: qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java (original)
+++ qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java Thu Feb 28 16:14:30 2013
@@ -48,7 +48,7 @@ public class BasicMessageConsumer_0_8_Te
 
         TestAMQSession testSession = new TestAMQSession(conn);
         BasicMessageConsumer_0_8 consumer =
-                new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
+                new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
 
         assertEquals("Reject behaviour was was not as expected", RejectBehaviour.SERVER, consumer.getRejectBehaviour());
     }
@@ -68,7 +68,7 @@ public class BasicMessageConsumer_0_8_Te
 
         final TestAMQSession testSession = new TestAMQSession(conn);
         final BasicMessageConsumer_0_8 consumer =
-                new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
+                new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
 
         assertEquals("Reject behaviour was was not as expected", RejectBehaviour.NORMAL, consumer.getRejectBehaviour());
     }
@@ -94,7 +94,7 @@ public class BasicMessageConsumer_0_8_Te
 
         TestAMQSession testSession = new TestAMQSession(conn);
         BasicMessageConsumer_0_8 consumer =
-                new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
+                new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
 
         assertEquals("Reject behaviour was was not as expected", RejectBehaviour.NORMAL, consumer.getRejectBehaviour());
     }

Modified: qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java (original)
+++ qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java Thu Feb 28 16:14:30 2013
@@ -120,6 +120,48 @@ public class BrokerDetailsTest extends T
         {
             assertTrue(urise.getReason().equals("Illegal character in port number"));
         }
+    }
+
+    public void testToStringMasksKeyStorePassword() throws Exception
+    {
+        String url = "tcp://localhost:5672?key_store_password='password'";
+        BrokerDetails details = new AMQBrokerDetails(url);
+
+        String expectedToString = "tcp://localhost:5672?key_store_password='********'";
+        String actualToString = details.toString();
+
+        assertEquals("Unexpected toString", expectedToString, actualToString);
+    }
+
+    public void testToStringMasksTrustStorePassword() throws Exception
+    {
+        String url = "tcp://localhost:5672?trust_store_password='password'";
+        BrokerDetails details = new AMQBrokerDetails(url);
+
+        String expectedToString = "tcp://localhost:5672?trust_store_password='********'";
+        String actualToString = details.toString();
+
+        assertEquals("Unexpected toString", expectedToString, actualToString);
+    }
+
+    public void testDefaultSsl() throws URLSyntaxException
+    {
+        String brokerURL = "tcp://localhost:5672";
+        AMQBrokerDetails broker = new AMQBrokerDetails(brokerURL);
+
+        assertNull("default value should be null", broker.getProperty(BrokerDetails.OPTIONS_SSL));
+    }
+
+    public void testOverridingSsl() throws URLSyntaxException
+    {
+        String brokerURL = "tcp://localhost:5672?ssl='true'";
+        AMQBrokerDetails broker = new AMQBrokerDetails(brokerURL);
+
+        assertTrue("value should be true", Boolean.valueOf(broker.getProperty(BrokerDetails.OPTIONS_SSL)));
+
+        brokerURL = "tcp://localhost:5672?ssl='false''&maxprefetch='1'";
+        broker = new AMQBrokerDetails(brokerURL);
 
+        assertFalse("value should be false", Boolean.valueOf(broker.getProperty(BrokerDetails.OPTIONS_SSL)));
     }
 }

Modified: qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java (original)
+++ qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java Thu Feb 28 16:14:30 2013
@@ -30,7 +30,6 @@ import org.apache.qpid.url.URLSyntaxExce
 
 public class ConnectionURLTest extends TestCase
 {
-
     public void testFailoverURL() throws URLSyntaxException
     {
         String url = "amqp://ritchiem:bob@/test?brokerlist='tcp://localhost:5672;tcp://fancyserver:3000/',failover='roundrobin?cyclecount='100''";
@@ -252,55 +251,47 @@ public class ConnectionURLTest extends T
         assertTrue(service.getPort() == 5672);
     }
 
-    public void testSingleTransportDefaultedBrokerWithIPandPort() throws URLSyntaxException
+    public void testConnectionURLOptionToStringMasksPassword() throws URLSyntaxException
     {
-        String url = "amqp://guest:guest@/test?brokerlist='127.0.0.1:1234'";
+        String url = "amqp://guest:guest@client/localhost?brokerlist='tcp://localhost:1234'";
+        ConnectionURL connectionurl = new AMQConnectionURL(url);
+
+        String expectedToString = "amqp://guest:********@client/localhost?brokerlist='tcp://localhost:1234'";
+        String actualToString = connectionurl.toString();
+        assertEquals("Unexpected toString form", expectedToString, actualToString);
+    }
+
+    public void testConnectionURLOptionToStringMasksSslTrustStorePassword() throws URLSyntaxException
+    {
+        String url = "amqp://guest:guest@client/vhost?brokerlist='tcp://host:1234?trust_store_password='truststorepassword''";
+        ConnectionURL connectionurl = new AMQConnectionURL(url);
 
-//        ConnectionURL connectionurl = new AMQConnectionURL(url);
-//
-//        assertTrue(connectionurl.getFailoverMethod() == null);
-//        assertTrue(connectionurl.getUsername().equals("guest"));
-//        assertTrue(connectionurl.getPassword().equals("guest"));
-//        assertTrue(connectionurl.getVirtualHost().equals("/temp"));
-//
-//
-//        assertTrue(connectionurl.getBrokerCount() == 1);
-//
-//        BrokerDetails service = connectionurl.getBrokerDetails(0);
-//
-//        assertTrue(service.getTransport().equals("tcp"));
-//
-//        assertTrue(service.getHost().equals("127.0.0.1"));
-//        assertTrue(service.getPort() == 1234);
+        String expectedToString = "amqp://guest:********@client/vhost?brokerlist='tcp://host:1234?trust_store_password='********''";
+        String actualToString = connectionurl.toString();
+        assertEquals("Unexpected toString form", expectedToString, actualToString);
+    }
+
+    public void testConnectionURLOptionToStringMasksSslKeyStorePassword() throws URLSyntaxException
+    {
+        String url = "amqp://guest:guest@client/vhost?brokerlist='tcp://host:1234?key_store_password='keystorepassword1';tcp://host:1235?key_store_password='keystorepassword2''";
+        ConnectionURL connectionurl = new AMQConnectionURL(url);
+
+        String expectedToString = "amqp://guest:********@client/vhost?brokerlist='tcp://host:1234?key_store_password='********';tcp://host:1235?key_store_password='********''";
+        String actualToString = connectionurl.toString();
+        assertEquals("Unexpected toString form", expectedToString, actualToString);
     }
 
     /**
      * Test for QPID-3662 to ensure the {@code toString()} representation is correct.
      */
-    public void testConnectionURLOptionToString() throws URLSyntaxException
+    public void testConnectionURLOptionToStringWithMaxPreftech() throws URLSyntaxException
     {
         String url = "amqp://guest:guest@client/localhost?maxprefetch='1'&brokerlist='tcp://localhost:1234?tcp_nodelay='true''";
         ConnectionURL connectionurl = new AMQConnectionURL(url);
 
-        assertNull(connectionurl.getFailoverMethod());
-        assertEquals("guest", connectionurl.getUsername());
-        assertEquals("guest", connectionurl.getPassword());
-        assertEquals("client", connectionurl.getClientName());
-        assertEquals("/localhost", connectionurl.getVirtualHost());
-        assertEquals("1", connectionurl.getOption("maxprefetch"));
-        assertTrue(connectionurl.getBrokerCount() == 1);
-
-        BrokerDetails service = connectionurl.getBrokerDetails(0);
-        assertTrue(service.getTransport().equals("tcp"));
-        assertTrue(service.getHost().equals("localhost"));
-        assertTrue(service.getPort() == 1234);
-        assertTrue(service.getProperties().containsKey("tcp_nodelay"));
-        assertEquals("true", service.getProperties().get("tcp_nodelay"));
-        
-        String nopasswd = "amqp://guest:********@client/localhost?maxprefetch='1'&brokerlist='tcp://localhost:1234?tcp_nodelay='true''";
-        String tostring = connectionurl.toString();
-        assertEquals(tostring.indexOf("maxprefetch"), tostring.lastIndexOf("maxprefetch"));
-        assertEquals(nopasswd, tostring);
+        String expectedToString = "amqp://guest:********@client/localhost?maxprefetch='1'&brokerlist='tcp://localhost:1234?tcp_nodelay='true''";
+        String actualToString = connectionurl.toString();
+        assertEquals("Unexpected toString form", expectedToString, actualToString);
     }
 
     public void testSingleTransportMultiOptionURL() throws URLSyntaxException
@@ -572,9 +563,64 @@ public class ConnectionURLTest extends T
                 connectionurl.getOption(ConnectionURL.OPTIONS_REJECT_BEHAVIOUR));
     }
 
-    public static junit.framework.Test suite()
+    /**
+     * Verify that when the ssl option is not specified, asking for the option returns null,
+     * such that this can later be used to verify it wasnt specified.
+     */
+    public void testDefaultSsl() throws URLSyntaxException
     {
-        return new junit.framework.TestSuite(ConnectionURLTest.class);
+        String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&foo='bar'";
+        ConnectionURL connectionURL = new AMQConnectionURL(url);
+
+        assertNull("default ssl value should be null", connectionURL.getOption(ConnectionURL.OPTIONS_SSL));
+    }
+
+    /**
+     * Verify that when the ssl option is specified, asking for the option returns the value,
+     * such that this can later be used to verify what value it was specified as.
+     */
+    public void testOverridingSsl() throws URLSyntaxException
+    {
+        String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&ssl='true'";
+        ConnectionURL connectionURL = new AMQConnectionURL(url);
+
+        assertTrue("value should be true", Boolean.valueOf(connectionURL.getOption(ConnectionURL.OPTIONS_SSL)));
+
+        url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&ssl='false'";
+        connectionURL = new AMQConnectionURL(url);
+
+        assertFalse("value should be false", Boolean.valueOf(connectionURL.getOption(ConnectionURL.OPTIONS_SSL)));
+    }
+
+    /**
+     * Verify that when the {@value ConnectionURL#OPTIONS_VERIFY_QUEUE_ON_SEND} option is not
+     * specified, asking for the option returns null, such that this can later be used to
+     * verify it wasn't specified.
+     */
+    public void testDefaultVerifyQueueOnSend() throws URLSyntaxException
+    {
+        String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&foo='bar'";
+        ConnectionURL connectionURL = new AMQConnectionURL(url);
+
+        assertNull("default ssl value should be null", connectionURL.getOption(ConnectionURL.OPTIONS_SSL));
+    }
+
+    /**
+     * Verify that when the {@value ConnectionURL#OPTIONS_VERIFY_QUEUE_ON_SEND} option is
+     * specified, asking for the option returns the value, such that this can later be used
+     * to verify what value it was specified as.
+     */
+    public void testOverridingVerifyQueueOnSend() throws URLSyntaxException
+    {
+        String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&verifyQueueOnSend='true'";
+        ConnectionURL connectionURL = new AMQConnectionURL(url);
+
+        assertTrue("value should be true", Boolean.valueOf(connectionURL.getOption(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND)));
+
+        url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&verifyQueueOnSend='false'";
+        connectionURL = new AMQConnectionURL(url);
+
+        assertFalse("value should be false", Boolean.valueOf(connectionURL.getOption(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND)));
     }
 }
 

Modified: qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java (original)
+++ qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java Thu Feb 28 16:14:30 2013
@@ -193,6 +193,126 @@ public class DestinationURLTest extends 
         assertTrue(dest.getQueueName().equals("test:testQueueD"));
     }
 
+    public void testExchangeOptionsNotPresent() throws URISyntaxException
+    {
+        String url = "exchangeClass://exchangeName/Destination/Queue";
+
+        AMQBindingURL burl = new AMQBindingURL(url);
+
+        assertTrue(url.equals(burl.toString()));
+
+        assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_DURABLE));
+        assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_AUTODELETE));
+        assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_INTERNAL));
+
+        class MyTestAMQDestination extends AMQDestination
+        {
+            public MyTestAMQDestination(BindingURL url)
+            {
+                super(url);
+            }
+            public boolean isNameRequired()
+            {
+                return false;
+            }
+        };
+
+        AMQDestination dest = new MyTestAMQDestination(burl);
+        assertFalse(dest.isExchangeAutoDelete());
+        assertFalse(dest.isExchangeDurable());
+        assertFalse(dest.isExchangeInternal());
+    }
+
+    public void testExchangeAutoDeleteOptionPresent() throws URISyntaxException
+    {
+        String url = "exchangeClass://exchangeName/Destination/Queue?" + BindingURL.OPTION_EXCHANGE_AUTODELETE + "='true'";
+
+        AMQBindingURL burl = new AMQBindingURL(url);
+
+        assertTrue(url.equals(burl.toString()));
+
+        assertEquals("true", burl.getOption(BindingURL.OPTION_EXCHANGE_AUTODELETE));
+        assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_DURABLE));
+        assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_INTERNAL));
+
+        class MyTestAMQDestination extends AMQDestination
+        {
+            public MyTestAMQDestination(BindingURL url)
+            {
+                super(url);
+            }
+            public boolean isNameRequired()
+            {
+                return false;
+            }
+        };
+
+        AMQDestination dest = new MyTestAMQDestination(burl);
+        assertTrue(dest.isExchangeAutoDelete());
+        assertFalse(dest.isExchangeDurable());
+        assertFalse(dest.isExchangeInternal());
+    }
+
+    public void testExchangeDurableOptionPresent() throws URISyntaxException
+    {
+        String url = "exchangeClass://exchangeName/Destination/Queue?" + BindingURL.OPTION_EXCHANGE_DURABLE + "='true'";
+
+        AMQBindingURL burl = new AMQBindingURL(url);
+
+        assertTrue(url.equals(burl.toString()));
+
+        assertEquals("true", burl.getOption(BindingURL.OPTION_EXCHANGE_DURABLE));
+        assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_AUTODELETE));
+        assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_INTERNAL));
+
+        class MyTestAMQDestination extends AMQDestination
+        {
+            public MyTestAMQDestination(BindingURL url)
+            {
+                super(url);
+            }
+            public boolean isNameRequired()
+            {
+                return false;
+            }
+        };
+
+        AMQDestination dest = new MyTestAMQDestination(burl);
+        assertTrue(dest.isExchangeDurable());
+        assertFalse(dest.isExchangeAutoDelete());
+        assertFalse(dest.isExchangeInternal());
+    }
+
+    public void testExchangeInternalOptionPresent() throws URISyntaxException
+    {
+        String url = "exchangeClass://exchangeName/Destination/Queue?" + BindingURL.OPTION_EXCHANGE_INTERNAL + "='true'";
+
+        AMQBindingURL burl = new AMQBindingURL(url);
+
+        assertTrue(url.equals(burl.toString()));
+
+        assertEquals("true", burl.getOption(BindingURL.OPTION_EXCHANGE_INTERNAL));
+        assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_AUTODELETE));
+        assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_DURABLE));
+
+        class MyTestAMQDestination extends AMQDestination
+        {
+            public MyTestAMQDestination(BindingURL url)
+            {
+                super(url);
+            }
+            public boolean isNameRequired()
+            {
+                return false;
+            }
+        };
+
+        AMQDestination dest = new MyTestAMQDestination(burl);
+        assertTrue(dest.isExchangeInternal());
+        assertFalse(dest.isExchangeDurable());
+        assertFalse(dest.isExchangeAutoDelete());
+    }
+
     public void testRejectBehaviourPresent() throws URISyntaxException
     {
         String url = "exchangeClass://exchangeName/Destination/Queue?rejectbehaviour='server'";



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org