You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/02/28 17:14:57 UTC
svn commit: r1451244 [36/45] - in /qpid/branches/asyncstore: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/ cpp/bindings/qmf/python/
cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf2/rub...
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java Thu Feb 28 16:14:30 2013
@@ -50,29 +50,28 @@ public class BasicMessageProducer_0_8 ex
BasicMessageProducer_0_8(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
AMQSession session, AMQProtocolHandler protocolHandler, long producerId, Boolean immediate, Boolean mandatory) throws AMQException
{
- super(_logger,connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory);
+ super(_logger,connection, destination,transacted,channelId,session, producerId, immediate, mandatory);
}
void declareDestination(AMQDestination destination)
{
-
- final MethodRegistry methodRegistry = getSession().getMethodRegistry();
- ExchangeDeclareBody body =
+ if(getSession().isDeclareExchanges())
+ {
+ final MethodRegistry methodRegistry = getSession().getMethodRegistry();
+ ExchangeDeclareBody body =
methodRegistry.createExchangeDeclareBody(getSession().getTicket(),
destination.getExchangeName(),
destination.getExchangeClass(),
destination.getExchangeName().toString().startsWith("amq."),
- false,
- false,
- false,
+ destination.isExchangeDurable(),
+ destination.isExchangeAutoDelete(),
+ destination.isExchangeInternal(),
true,
null);
- // Declare the exchange
- // Note that the durable and internal arguments are ignored since passive is set to false
-
- AMQFrame declare = body.generateFrame(getChannelId());
+ AMQFrame declare = body.generateFrame(getChannelId());
- getProtocolHandler().writeFrame(declare);
+ getConnection().getProtocolHandler().writeFrame(declare);
+ }
}
void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
@@ -172,7 +171,7 @@ public class BasicMessageProducer_0_8 ex
throw jmse;
}
- getProtocolHandler().writeFrame(compositeFrame);
+ getConnection().getProtocolHandler().writeFrame(compositeFrame);
}
/**
@@ -234,4 +233,9 @@ public class BasicMessageProducer_0_8 ex
return frameCount;
}
+ @Override
+ public AMQSession_0_8 getSession()
+ {
+ return (AMQSession_0_8) super.getSession();
+ }
}
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java Thu Feb 28 16:14:30 2013
@@ -88,7 +88,7 @@ public class XASessionImpl extends AMQSe
*/
public void createSession()
{
- _qpidDtxSession = getQpidConnection().createSession(0);
+ _qpidDtxSession = getQpidConnection().createSession(0,true);
_qpidDtxSession.setSessionListener(this);
_qpidDtxSession.dtxSelect();
}
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java Thu Feb 28 16:14:30 2013
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.client.handler;
+import java.nio.ByteBuffer;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +35,8 @@ import org.apache.qpid.framing.AMQShortS
import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.ConnectionCloseOkBody;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.TransportException;
public class ConnectionCloseMethodHandler implements StateAwareMethodListener<ConnectionCloseBody>
{
@@ -91,18 +95,24 @@ public class ConnectionCloseMethodHandle
}
finally
{
+ Sender<ByteBuffer> sender = session.getSender();
if (error != null)
{
session.notifyError(error);
- }
-
- // Close the protocol Session, including any open TCP connections
- session.closeProtocolSession();
+ }
- // Closing the session should not introduce a race condition as this thread will continue to propgate any
- // exception in to the exceptionCaught method of the SessionHandler.
- // Any sessionClosed event should occur after this.
+ // Close the open TCP connection
+ try
+ {
+ sender.close();
+ }
+ catch(TransportException e)
+ {
+ //Ignore, they are already logged by the Sender and this
+ //is a connection-close being processed by the IoReceiver
+ //which will as it closes initiate failover if necessary.
+ }
}
}
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java Thu Feb 28 16:14:30 2013
@@ -91,6 +91,7 @@ public class AMQMessageDelegate_0_10 ext
private MessageProperties _messageProps;
private DeliveryProperties _deliveryProps;
+ private String _messageID;
protected AMQMessageDelegate_0_10()
{
@@ -171,8 +172,12 @@ public class AMQMessageDelegate_0_10 ext
public String getJMSMessageID() throws JMSException
{
- UUID id = _messageProps.getMessageId();
- return id == null ? null : "ID:" + id;
+ if (_messageID == null && _messageProps.getMessageId() != null)
+ {
+ UUID id = _messageProps.getMessageId();
+ _messageID = "ID:" + id;
+ }
+ return _messageID;
}
public void setJMSMessageID(String messageId) throws JMSException
@@ -185,14 +190,7 @@ public class AMQMessageDelegate_0_10 ext
{
if(messageId.startsWith("ID:"))
{
- try
- {
- _messageProps.setMessageId(UUID.fromString(messageId.substring(3)));
- }
- catch(IllegalArgumentException ex)
- {
- throw new JMSException("MessageId '"+messageId+"' is not of the correct format, it must be ID: followed by a UUID");
- }
+ _messageID = messageId;
}
else
{
@@ -201,6 +199,7 @@ public class AMQMessageDelegate_0_10 ext
}
}
+ /* Used by the internal implementation */
public void setJMSMessageID(UUID messageId) throws JMSException
{
if(messageId == null)
@@ -344,7 +343,7 @@ public class AMQMessageDelegate_0_10 ext
int type = ((AMQSession_0_10)getAMQSession()).resolveAddressType(amqd);
if (type == AMQDestination.QUEUE_TYPE)
{
- ((AMQSession_0_10)getAMQSession()).setLegacyFiledsForQueueType(amqd);
+ ((AMQSession_0_10)getAMQSession()).setLegacyFieldsForQueueType(amqd);
}
else
{
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java Thu Feb 28 16:14:30 2013
@@ -196,7 +196,14 @@ public class JMSObjectMessage extends Ab
if (data != null && data.hasRemaining())
{
ClassLoadingAwareObjectInputStream in = new ClassLoadingAwareObjectInputStream(new ByteBufferInputStream(data));
- result = (Serializable) in.readObject();
+ try
+ {
+ result = (Serializable) in.readObject();
+ }
+ finally
+ {
+ in.close();
+ }
}
return result;
}
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java Thu Feb 28 16:14:30 2013
@@ -44,7 +44,12 @@ public class JMSStreamMessage extends Ab
}
+ JMSStreamMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) throws AMQException
+ {
+ super(delegateFactory, data!=null);
+ _typedBytesContentWriter = new TypedBytesContentWriter();
+ }
JMSStreamMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
{
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java Thu Feb 28 16:14:30 2013
@@ -66,6 +66,7 @@ public class MessageFactoryRegistry
mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory());
mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory());
mf.registerFactory(AMQPEncodedMapMessage.MIME_TYPE, new AMQPEncodedMapMessageFactory());
+ mf.registerFactory(AMQPEncodedListMessage.MIME_TYPE, new AMQPEncodedListMessageFactory());
mf.registerFactory(null, mf._default);
return mf;
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java Thu Feb 28 16:14:30 2013
@@ -20,21 +20,20 @@
*/
package org.apache.qpid.client.messaging.address;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQDestination.Binding;
import org.apache.qpid.client.messaging.address.Link.Reliability;
import org.apache.qpid.client.messaging.address.Link.Subscription;
-import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
-import org.apache.qpid.client.messaging.address.Node.QueueNode;
+import org.apache.qpid.client.messaging.address.Link.SubscriptionQueue;
import org.apache.qpid.configuration.Accessor;
import org.apache.qpid.configuration.Accessor.MapAccessor;
import org.apache.qpid.messaging.Address;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
/**
* Utility class for extracting information from the address class
*/
@@ -68,58 +67,56 @@ public class AddressHelper
public static final String ARGUMENTS = "arguments";
public static final String RELIABILITY = "reliability";
- private Address address;
- private Accessor addressProps;
- private Accessor nodeProps;
- private Accessor linkProps;
+ private Address _address;
+ private Accessor _addressPropAccess;
+ private Accessor _nodePropAccess;
+ private Accessor _linkPropAccess;
+ private Map _addressPropMap;
+ private Map _nodePropMap;
+ private Map _linkPropMap;
public AddressHelper(Address address)
{
- this.address = address;
- addressProps = new MapAccessor(address.getOptions());
- Map node_props = address.getOptions() == null
+ this._address = address;
+ this._addressPropMap = address.getOptions();
+ this._addressPropAccess = new MapAccessor(_addressPropMap);
+ this._nodePropMap = address.getOptions() == null
|| address.getOptions().get(NODE) == null ? null
: (Map) address.getOptions().get(NODE);
- if (node_props != null)
+ if (_nodePropMap != null)
{
- nodeProps = new MapAccessor(node_props);
+ _nodePropAccess = new MapAccessor(_nodePropMap);
}
- Map link_props = address.getOptions() == null
+ this._linkPropMap = address.getOptions() == null
|| address.getOptions().get(LINK) == null ? null
: (Map) address.getOptions().get(LINK);
- if (link_props != null)
+ if (_linkPropMap != null)
{
- linkProps = new MapAccessor(link_props);
+ _linkPropAccess = new MapAccessor(_linkPropMap);
}
}
public String getCreate()
{
- return addressProps.getString(CREATE);
+ return _addressPropAccess.getString(CREATE);
}
public String getAssert()
{
- return addressProps.getString(ASSERT);
+ return _addressPropAccess.getString(ASSERT);
}
public String getDelete()
{
- return addressProps.getString(DELETE);
- }
-
- public boolean isNoLocal()
- {
- Boolean b = nodeProps.getBoolean(NO_LOCAL);
- return b == null ? false : b;
+ return _addressPropAccess.getString(DELETE);
}
public boolean isBrowseOnly()
{
- String mode = addressProps.getString(MODE);
+ String mode = _addressPropAccess.getString(MODE);
return mode != null && mode.equals(BROWSE) ? true : false;
}
@@ -127,7 +124,7 @@ public class AddressHelper
public List<Binding> getBindings(Map props)
{
List<Binding> bindings = new ArrayList<Binding>();
- List<Map> bindingList = (List<Map>) props.get(X_BINDINGS);
+ List<Map> bindingList = (props == null) ? Collections.EMPTY_LIST : (List<Map>) props.get(X_BINDINGS);
if (bindingList != null)
{
for (Map bindingMap : bindingList)
@@ -157,117 +154,70 @@ public class AddressHelper
}
}
- public int getTargetNodeType() throws Exception
+ public int getNodeType() throws Exception
{
- if (nodeProps == null || nodeProps.getString(TYPE) == null)
+ if (_nodePropAccess == null || _nodePropAccess.getString(TYPE) == null)
{
// need to query and figure out
return AMQDestination.UNKNOWN_TYPE;
- } else if (nodeProps.getString(TYPE).equals("queue"))
+ }
+ else if (_nodePropAccess.getString(TYPE).equals("queue"))
{
return AMQDestination.QUEUE_TYPE;
- } else if (nodeProps.getString(TYPE).equals("topic"))
+ }
+ else if (_nodePropAccess.getString(TYPE).equals("topic"))
{
return AMQDestination.TOPIC_TYPE;
- } else
+ }
+ else
{
throw new Exception("unkown exchange type");
}
}
- public Node getTargetNode(int addressType)
+ public Node getNode()
{
- // target node here is the default exchange
- if (nodeProps == null || addressType == AMQDestination.QUEUE_TYPE)
- {
- return new ExchangeNode();
- } else if (addressType == AMQDestination.TOPIC_TYPE)
- {
- Map node = (Map) address.getOptions().get(NODE);
- return createExchangeNode(node);
- } else
+ Node node = new Node(_address.getName());
+ if (_nodePropAccess != null)
{
- // don't know yet
- return null;
- }
- }
-
- private Node createExchangeNode(Map parent)
- {
- Map declareArgs = getDeclareArgs(parent);
- MapAccessor argsMap = new MapAccessor(declareArgs);
- ExchangeNode node = new ExchangeNode();
- node.setExchangeType(argsMap.getString(TYPE) == null ? null : argsMap
- .getString(TYPE));
- fillInCommonNodeArgs(node, parent, argsMap);
- return node;
- }
+ Map xDeclareMap = getDeclareArgs(_nodePropMap);
+ MapAccessor xDeclareMapAccessor = new MapAccessor(xDeclareMap);
- private Node createQueueNode(Map parent)
- {
- Map declareArgs = getDeclareArgs(parent);
- MapAccessor argsMap = new MapAccessor(declareArgs);
- QueueNode node = new QueueNode();
- node.setAlternateExchange(argsMap.getString(ALT_EXCHANGE));
- node.setExclusive(argsMap.getBoolean(EXCLUSIVE) == null ? false
- : argsMap.getBoolean(EXCLUSIVE));
- fillInCommonNodeArgs(node, parent, argsMap);
-
- return node;
- }
-
- private void fillInCommonNodeArgs(Node node, Map parent, MapAccessor argsMap)
- {
- node.setDurable(getDurability(parent));
- node.setAutoDelete(argsMap.getBoolean(AUTO_DELETE) == null ? false
- : argsMap.getBoolean(AUTO_DELETE));
- node.setAlternateExchange(argsMap.getString(ALT_EXCHANGE));
- node.setBindings(getBindings(parent));
- if (getDeclareArgs(parent).containsKey(ARGUMENTS))
- {
- node.setDeclareArgs((Map<String,Object>)getDeclareArgs(parent).get(ARGUMENTS));
+ node.setDurable(getBooleanProperty(_nodePropAccess,DURABLE,false));
+ node.setAutoDelete(getBooleanProperty(xDeclareMapAccessor,AUTO_DELETE,false));
+ node.setExclusive(getBooleanProperty(xDeclareMapAccessor,EXCLUSIVE,false));
+ node.setAlternateExchange(xDeclareMapAccessor.getString(ALT_EXCHANGE));
+ if (xDeclareMapAccessor.getString(TYPE) != null)
+ {
+ node.setExchangeType(xDeclareMapAccessor.getString(TYPE));
+ }
+ node.setBindings(getBindings(_nodePropMap));
+ if (!xDeclareMap.isEmpty() && xDeclareMap.containsKey(ARGUMENTS))
+ {
+ node.setDeclareArgs((Map<String,Object>)xDeclareMap.get(ARGUMENTS));
+ }
}
- }
-
- private boolean getDurability(Map map)
- {
- Accessor access = new MapAccessor(map);
- Boolean result = access.getBoolean(DURABLE);
- return (result == null) ? false : result.booleanValue();
+ return node;
}
- /**
- * if the type == queue x-declare args from the node props is used. if the
- * type == exchange x-declare args from the link props is used else just
- * create a default temp queue.
- */
- public Node getSourceNode(int addressType)
+ // This should really be in the Accessor interface
+ private boolean getBooleanProperty(Accessor access, String propName, boolean defaultValue)
{
- if (addressType == AMQDestination.QUEUE_TYPE && nodeProps != null)
- {
- return createQueueNode((Map) address.getOptions().get(NODE));
- }
- if (addressType == AMQDestination.TOPIC_TYPE && linkProps != null)
- {
- return createQueueNode((Map) address.getOptions().get(LINK));
- } else
- {
- // need to query the info
- return new QueueNode();
- }
+ Boolean result = access.getBoolean(propName);
+ return (result == null) ? defaultValue : result.booleanValue();
}
public Link getLink() throws Exception
{
Link link = new Link();
link.setSubscription(new Subscription());
- if (linkProps != null)
+ link.setSubscriptionQueue(new SubscriptionQueue());
+ if (_linkPropAccess != null)
{
- link.setDurable(linkProps.getBoolean(DURABLE) == null ? false
- : linkProps.getBoolean(DURABLE));
- link.setName(linkProps.getString(NAME));
+ link.setDurable(getBooleanProperty(_linkPropAccess,DURABLE,false));
+ link.setName(_linkPropAccess.getString(NAME));
- String reliability = linkProps.getString(RELIABILITY);
+ String reliability = _linkPropAccess.getString(RELIABILITY);
if ( reliability != null)
{
if (reliability.equalsIgnoreCase("unreliable"))
@@ -283,13 +233,12 @@ public class AddressHelper
throw new Exception("The reliability mode '" +
reliability + "' is not yet supported");
}
-
}
- if (((Map) address.getOptions().get(LINK)).get(CAPACITY) instanceof Map)
+ if (((Map) _address.getOptions().get(LINK)).get(CAPACITY) instanceof Map)
{
MapAccessor capacityProps = new MapAccessor(
- (Map) ((Map) address.getOptions().get(LINK))
+ (Map) ((Map) _address.getOptions().get(LINK))
.get(CAPACITY));
link
.setConsumerCapacity(capacityProps
@@ -302,17 +251,19 @@ public class AddressHelper
}
else
{
- int cap = linkProps.getInt(CAPACITY) == null ? 0 : linkProps
+ int cap = _linkPropAccess.getInt(CAPACITY) == null ? 0 : _linkPropAccess
.getInt(CAPACITY);
link.setConsumerCapacity(cap);
link.setProducerCapacity(cap);
}
- link.setFilter(linkProps.getString(FILTER));
+ link.setFilter(_linkPropAccess.getString(FILTER));
// so far filter type not used
- if (((Map) address.getOptions().get(LINK)).containsKey(X_SUBSCRIBE))
+ Map linkMap = (Map) _address.getOptions().get(LINK);
+
+ if (linkMap != null && linkMap.containsKey(X_SUBSCRIBE))
{
- Map x_subscribe = (Map)((Map) address.getOptions().get(LINK)).get(X_SUBSCRIBE);
+ Map x_subscribe = (Map)((Map) _address.getOptions().get(LINK)).get(X_SUBSCRIBE);
if (x_subscribe.containsKey(ARGUMENTS))
{
@@ -324,6 +275,18 @@ public class AddressHelper
link.getSubscription().setExclusive(exclusive);
}
+
+ link.setBindings(getBindings(linkMap));
+ Map xDeclareMap = getDeclareArgs(linkMap);
+ SubscriptionQueue queue = link.getSubscriptionQueue();
+ if (!xDeclareMap.isEmpty() && xDeclareMap.containsKey(ARGUMENTS))
+ {
+ MapAccessor xDeclareMapAccessor = new MapAccessor(xDeclareMap);
+ queue.setAutoDelete(getBooleanProperty(xDeclareMapAccessor,AUTO_DELETE,true));
+ queue.setExclusive(getBooleanProperty(xDeclareMapAccessor,EXCLUSIVE,true));
+ queue.setAlternateExchange(xDeclareMapAccessor.getString(ALT_EXCHANGE));
+ queue.setDeclareArgs((Map<String,Object>)xDeclareMap.get(ARGUMENTS));
+ }
}
return link;
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java Thu Feb 28 16:14:30 2013
@@ -20,9 +20,14 @@
*/
package org.apache.qpid.client.messaging.address;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import org.apache.qpid.client.AMQDestination.Binding;
+
public class Link
{
public enum FilterType { SQL92, XQUERY, SUBJECT }
@@ -36,10 +41,11 @@ public class Link
private boolean _isDurable;
private int _consumerCapacity = 0;
private int _producerCapacity = 0;
- private Node node;
private Subscription subscription;
private Reliability reliability = Reliability.AT_LEAST_ONCE;
-
+ private List<Binding> _bindings = new ArrayList<Binding>();
+ private SubscriptionQueue _subscriptionQueue;
+
public Reliability getReliability()
{
return reliability;
@@ -50,21 +56,11 @@ public class Link
this.reliability = reliability;
}
- public Node getNode()
- {
- return node;
- }
-
- public void setNode(Node node)
- {
- this.node = node;
- }
-
public boolean isDurable()
{
return _isDurable;
}
-
+
public void setDurable(boolean durable)
{
_isDurable = durable;
@@ -139,6 +135,74 @@ public class Link
{
this.subscription = subscription;
}
+
+ public List<Binding> getBindings()
+ {
+ return _bindings;
+ }
+
+ public void setBindings(List<Binding> bindings)
+ {
+ _bindings = bindings;
+ }
+
+ public SubscriptionQueue getSubscriptionQueue()
+ {
+ return _subscriptionQueue;
+ }
+
+ public void setSubscriptionQueue(SubscriptionQueue subscriptionQueue)
+ {
+ this._subscriptionQueue = subscriptionQueue;
+ }
+
+ public static class SubscriptionQueue
+ {
+ private Map<String,Object> _declareArgs = new HashMap<String,Object>();
+ private boolean _isAutoDelete = true;
+ private boolean _isExclusive = true;
+ private String _alternateExchange;
+
+ public Map<String,Object> getDeclareArgs()
+ {
+ return _declareArgs;
+ }
+
+ public void setDeclareArgs(Map<String,Object> options)
+ {
+ _declareArgs = options;
+ }
+
+ public boolean isAutoDelete()
+ {
+ return _isAutoDelete;
+ }
+
+ public void setAutoDelete(boolean autoDelete)
+ {
+ _isAutoDelete = autoDelete;
+ }
+
+ public boolean isExclusive()
+ {
+ return _isExclusive;
+ }
+
+ public void setExclusive(boolean exclusive)
+ {
+ _isExclusive = exclusive;
+ }
+
+ public String getAlternateExchange()
+ {
+ return _alternateExchange;
+ }
+
+ public void setAlternateExchange(String altExchange)
+ {
+ _alternateExchange = altExchange;
+ }
+ }
public static class Subscription
{
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java Thu Feb 28 16:14:30 2013
@@ -26,19 +26,33 @@ import org.apache.qpid.client.AMQDestina
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public abstract class Node
+public class Node
{
private int _nodeType = AMQDestination.UNKNOWN_TYPE;
+ private String _name;
private boolean _isDurable;
private boolean _isAutoDelete;
+ private boolean _isExclusive;
private String _alternateExchange;
+ private String _exchangeType = "topic"; // used when node is an exchange instead of a queue.
private List<Binding> _bindings = new ArrayList<Binding>();
- private Map<String,Object> _declareArgs = Collections.emptyMap();
+ private Map<String,Object> _declareArgs = new HashMap<String,Object>();
- protected Node(int nodeType)
+ protected Node(String name)
+ {
+ _name = name;
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public void setNodeType(int nodeType)
{
_nodeType = nodeType;
}
@@ -58,6 +72,16 @@ public abstract class Node
_isDurable = durable;
}
+ public boolean isExclusive()
+ {
+ return _isExclusive;
+ }
+
+ public void setExclusive(boolean exclusive)
+ {
+ _isExclusive = exclusive;
+ }
+
public boolean isAutoDelete()
{
return _isAutoDelete;
@@ -100,56 +124,15 @@ public abstract class Node
public void setDeclareArgs(Map<String,Object> options)
{
_declareArgs = options;
- }
-
- public static class QueueNode extends Node
+ }
+
+ public void setExchangeType(String type)
{
- private boolean _isExclusive;
- private QpidQueueOptions _queueOptions = new QpidQueueOptions();
-
- public QueueNode()
- {
- super(AMQDestination.QUEUE_TYPE);
- }
-
- public boolean isExclusive()
- {
- return _isExclusive;
- }
-
- public void setExclusive(boolean exclusive)
- {
- _isExclusive = exclusive;
- }
- }
-
- public static class ExchangeNode extends Node
- {
- private QpidExchangeOptions _exchangeOptions = new QpidExchangeOptions();
- private String _exchangeType;
-
- public ExchangeNode()
- {
- super(AMQDestination.TOPIC_TYPE);
- }
-
- public String getExchangeType()
- {
- return _exchangeType;
- }
-
- public void setExchangeType(String exchangeType)
- {
- _exchangeType = exchangeType;
- }
-
+ _exchangeType = type;
}
-
- public static class UnknownNodeType extends Node
- {
- public UnknownNodeType()
- {
- super(AMQDestination.UNKNOWN_TYPE);
- }
+
+ public String getExchangeType()
+ {
+ return _exchangeType;
}
}
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Thu Feb 28 16:14:30 2013
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.client.protocol;
+import org.apache.qpid.client.HeartbeatListener;
import org.apache.qpid.util.BytesDataOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,6 +57,7 @@ import org.apache.qpid.protocol.AMQMetho
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.NetworkConnection;
import java.io.IOException;
@@ -177,6 +179,9 @@ public class AMQProtocolHandler implemen
private NetworkConnection _network;
private Sender<ByteBuffer> _sender;
+ private long _lastReadTime = System.currentTimeMillis();
+ private long _lastWriteTime = System.currentTimeMillis();
+ private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT;
/**
* Creates a new protocol handler, associated with the specified client connection instance.
@@ -210,48 +215,67 @@ public class AMQProtocolHandler implemen
}
else
{
- _logger.debug("Session closed called with failover state currently " + _failoverState);
-
- // reconnetablility was introduced here so as not to disturb the client as they have made their intentions
- // known through the policy settings.
-
- if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.failoverAllowed())
+ // Use local variable to keep flag whether fail-over allowed or not,
+ // in order to execute AMQConnection#exceptionRecievedout out of synchronization block,
+ // otherwise it might deadlock with failover mutex
+ boolean failoverNotAllowed = false;
+ synchronized (this)
{
- _logger.debug("FAILOVER STARTING");
- if (_failoverState == FailoverState.NOT_STARTED)
- {
- _failoverState = FailoverState.IN_PROGRESS;
- startFailoverThread();
- }
- else
- {
- _logger.debug("Not starting failover as state currently " + _failoverState);
- }
- }
- else
- {
- _logger.debug("Failover not allowed by policy."); // or already in progress?
-
if (_logger.isDebugEnabled())
{
- _logger.debug(_connection.getFailoverPolicy().toString());
+ _logger.debug("Session closed called with failover state " + _failoverState);
}
- if (_failoverState != FailoverState.IN_PROGRESS)
+ // reconnetablility was introduced here so as not to disturb the client as they have made their intentions
+ // known through the policy settings.
+ if (_failoverState == FailoverState.NOT_STARTED)
{
- _logger.debug("sessionClose() not allowed to failover");
- _connection.exceptionReceived(new AMQDisconnectedException(
- "Server closed connection and reconnection " + "not permitted.",
- _stateManager.getLastException()));
+ // close the sender
+ try
+ {
+ _sender.close();
+ }
+ catch (Exception e)
+ {
+ _logger.warn("Exception occured on closing the sender", e);
+ }
+ if (_connection.failoverAllowed())
+ {
+ _failoverState = FailoverState.IN_PROGRESS;
+
+ _logger.debug("FAILOVER STARTING");
+ startFailoverThread();
+ }
+ else if (_connection.isConnected())
+ {
+ failoverNotAllowed = true;
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Failover not allowed by policy:" + _connection.getFailoverPolicy());
+ }
+ }
+ else
+ {
+ _logger.debug("We are in process of establishing the initial connection");
+ }
}
else
{
- _logger.debug("sessionClose() failover in progress");
+ _logger.debug("Not starting the failover thread as state currently " + _failoverState);
}
}
+
+ if (failoverNotAllowed)
+ {
+ _connection.exceptionReceived(new AMQDisconnectedException(
+ "Server closed connection and reconnection not permitted.", _stateManager.getLastException()));
+ }
}
- _logger.debug("Protocol Session [" + this + "] closed");
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Protocol Session [" + this + "] closed");
+ }
}
/** See {@link FailoverHandler} to see rationale for separate thread. */
@@ -280,7 +304,6 @@ public class AMQProtocolHandler implemen
{
_logger.debug("Protocol Session [" + this + "] idle: reader");
// failover:
- HeartbeatDiagnostics.timeout();
_logger.warn("Timed out while waiting for heartbeat from peer.");
_network.close();
}
@@ -289,7 +312,7 @@ public class AMQProtocolHandler implemen
{
_logger.debug("Protocol Session [" + this + "] idle: reader");
writeFrame(HeartbeatBody.FRAME);
- HeartbeatDiagnostics.sent();
+ _heartbeatListener.heartbeatSent();
}
/**
@@ -297,14 +320,29 @@ public class AMQProtocolHandler implemen
*/
public void exception(Throwable cause)
{
- if (_failoverState == FailoverState.NOT_STARTED)
+ boolean causeIsAConnectionProblem =
+ cause instanceof AMQConnectionClosedException ||
+ cause instanceof IOException ||
+ cause instanceof TransportException;
+
+ if (causeIsAConnectionProblem)
{
- if ((cause instanceof AMQConnectionClosedException) || cause instanceof IOException)
+ //ensure the IoSender and IoReceiver are closed
+ try
{
- _logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
- // this will attempt failover
_network.close();
- closed();
+ }
+ catch (Exception e)
+ {
+ //ignore
+ }
+ }
+ FailoverState state = getFailoverState();
+ if (state == FailoverState.NOT_STARTED)
+ {
+ if (causeIsAConnectionProblem)
+ {
+ _logger.info("Connection exception caught therefore going to attempt failover: " + cause, cause);
}
else
{
@@ -319,7 +357,7 @@ public class AMQProtocolHandler implemen
}
// we reach this point if failover was attempted and failed therefore we need to let the calling app
// know since we cannot recover the situation
- else if (_failoverState == FailoverState.FAILED)
+ else if (state == FailoverState.FAILED)
{
_logger.error("Exception caught by protocol handler: " + cause, cause);
@@ -329,6 +367,10 @@ public class AMQProtocolHandler implemen
propagateExceptionToAllWaiters(amqe);
_connection.exceptionReceived(cause);
}
+ else
+ {
+ _logger.warn("Exception caught by protocol handler: " + cause, cause);
+ }
}
/**
@@ -403,6 +445,7 @@ public class AMQProtocolHandler implemen
public void received(ByteBuffer msg)
{
_readBytes += msg.remaining();
+ _lastReadTime = System.currentTimeMillis();
try
{
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
@@ -431,8 +474,6 @@ public class AMQProtocolHandler implemen
final AMQBody bodyFrame = frame.getBodyFrame();
- HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
-
bodyFrame.handle(frame.getChannel(), _protocolSession);
_connection.bytesReceived(_readBytes);
@@ -521,6 +562,7 @@ public class AMQProtocolHandler implemen
public synchronized void writeFrame(AMQDataBlock frame, boolean flush)
{
final ByteBuffer buf = asByteBuffer(frame);
+ _lastWriteTime = System.currentTimeMillis();
_writtenBytes += buf.remaining();
_sender.send(buf);
if(flush)
@@ -792,14 +834,14 @@ public class AMQProtocolHandler implemen
return _protocolSession;
}
- FailoverState getFailoverState()
+ synchronized FailoverState getFailoverState()
{
return _failoverState;
}
- public void setFailoverState(FailoverState failoverState)
+ public synchronized void setFailoverState(FailoverState failoverState)
{
- _failoverState = failoverState;
+ _failoverState= failoverState;
}
public byte getProtocolMajorVersion()
@@ -843,6 +885,23 @@ public class AMQProtocolHandler implemen
_sender = sender;
}
+ @Override
+ public long getLastReadTime()
+ {
+ return _lastReadTime;
+ }
+
+ @Override
+ public long getLastWriteTime()
+ {
+ return _lastWriteTime;
+ }
+
+ protected Sender<ByteBuffer> getSender()
+ {
+ return _sender;
+ }
+
/** @param delay delay in seconds (not ms) */
void initHeartbeats(int delay)
{
@@ -850,7 +909,6 @@ public class AMQProtocolHandler implemen
{
_network.setMaxWriteIdle(delay);
_network.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
- HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
}
}
@@ -865,5 +923,13 @@ public class AMQProtocolHandler implemen
}
+ public void setHeartbeatListener(HeartbeatListener listener)
+ {
+ _heartbeatListener = listener == null ? HeartbeatListener.DEFAULT : listener;
+ }
+ public void heartbeatBodyReceived()
+ {
+ _heartbeatListener.heartbeatReceived();
+ }
}
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Thu Feb 28 16:14:30 2013
@@ -48,6 +48,8 @@ import org.apache.qpid.transport.Transpo
import javax.jms.JMSException;
import javax.security.sasl.SaslClient;
+
+import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -265,7 +267,7 @@ public class AMQProtocolSession implemen
public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException
{
-
+ _protocolHandler.heartbeatBodyReceived();
}
/**
@@ -372,6 +374,11 @@ public class AMQProtocolSession implemen
}
}
+ public Sender<ByteBuffer> getSender()
+ {
+ return _protocolHandler.getSender();
+ }
+
public void failover(String host, int port)
{
_protocolHandler.failover(host, port);
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.java Thu Feb 28 16:14:30 2013
@@ -28,8 +28,10 @@ import org.apache.qpid.util.FileUtils;
import javax.security.sasl.SaslClientFactory;
import java.io.IOException;
import java.io.InputStream;
+import java.security.Provider;
import java.security.Security;
import java.util.Enumeration;
+import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
@@ -67,10 +69,10 @@ public class DynamicSaslRegistrar
}
/** Reads the properties file, and creates a dynamic security provider to register the SASL implementations with. */
- public static void registerSaslProviders()
+ public static ProviderRegistrationResult registerSaslProviders()
{
_logger.debug("public static void registerSaslProviders(): called");
-
+ ProviderRegistrationResult result = ProviderRegistrationResult.FAILED;
// Open the SASL properties file, using the default name is one is not specified.
String filename = System.getProperty(FILE_PROPERTY);
InputStream is =
@@ -89,22 +91,45 @@ public class DynamicSaslRegistrar
if (factories.size() > 0)
{
// Ensure we are used before the defaults
- if (Security.insertProviderAt(new JCAProvider(factories), 1) == -1)
+ JCAProvider qpidProvider = new JCAProvider(factories);
+ if (Security.insertProviderAt(qpidProvider, 1) == -1)
{
- _logger.error("Unable to load custom SASL providers.");
+ Provider registeredProvider = findProvider(JCAProvider.QPID_CLIENT_SASL_PROVIDER_NAME);
+ if (registeredProvider == null)
+ {
+ result = ProviderRegistrationResult.FAILED;
+ _logger.error("Unable to load custom SASL providers.");
+ }
+ else if (registeredProvider.equals(qpidProvider))
+ {
+ result = ProviderRegistrationResult.EQUAL_ALREADY_REGISTERED;
+ _logger.debug("Custom SASL provider is already registered with equal properties.");
+ }
+ else
+ {
+ result = ProviderRegistrationResult.DIFFERENT_ALREADY_REGISTERED;
+ _logger.warn("Custom SASL provider was already registered with different properties.");
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Custom SASL provider " + registeredProvider + " properties: " + new HashMap<Object, Object>(registeredProvider));
+ }
+ }
}
else
{
+ result = ProviderRegistrationResult.SUCCEEDED;
_logger.info("Additional SASL providers successfully registered.");
}
}
else
{
- _logger.warn("No additional SASL providers registered.");
+ result = ProviderRegistrationResult.NO_SASL_FACTORIES;
+ _logger.warn("No additional SASL factories found to register.");
}
}
catch (IOException e)
{
+ result = ProviderRegistrationResult.FAILED;
_logger.error("Error reading properties: " + e, e);
}
finally
@@ -122,6 +147,22 @@ public class DynamicSaslRegistrar
}
}
}
+ return result;
+ }
+
+ static Provider findProvider(String name)
+ {
+ Provider[] providers = Security.getProviders();
+ Provider registeredProvider = null;
+ for (Provider provider : providers)
+ {
+ if (name.equals(provider.getName()))
+ {
+ registeredProvider = provider;
+ break;
+ }
+ }
+ return registeredProvider;
}
/**
@@ -158,15 +199,24 @@ public class DynamicSaslRegistrar
continue;
}
- _logger.debug("Registering class "+ clazz.getName() +" for mechanism "+mechanism);
+ _logger.debug("Found class "+ clazz.getName() +" for mechanism "+mechanism);
factoriesToRegister.put(mechanism, (Class<? extends SaslClientFactory>) clazz);
}
catch (Exception ex)
{
- _logger.error("Error instantiating SaslClientFactory calss " + className + " - skipping");
+ _logger.error("Error instantiating SaslClientFactory class " + className + " - skipping");
}
}
return factoriesToRegister;
}
+
+ public static enum ProviderRegistrationResult
+ {
+ SUCCEEDED,
+ EQUAL_ALREADY_REGISTERED,
+ DIFFERENT_ALREADY_REGISTERED,
+ NO_SASL_FACTORIES,
+ FAILED;
+ }
}
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/security/JCAProvider.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/security/JCAProvider.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/security/JCAProvider.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/security/JCAProvider.java Thu Feb 28 16:14:30 2013
@@ -39,6 +39,11 @@ import java.util.Map;
*/
public class JCAProvider extends Provider
{
+ static final String QPID_CLIENT_SASL_PROVIDER_NAME = "AMQSASLProvider-Client";
+ static final String QPID_CLIENT_SASL_PROVIDER_INFO = "A JCA provider that registers all "
+ + "AMQ SASL providers that want to be registered";
+ static final double QPID_CLIENT_SASL_PROVIDER_VERSION = 1.0;
+
private static final Logger log = LoggerFactory.getLogger(JCAProvider.class);
/**
@@ -48,8 +53,7 @@ public class JCAProvider extends Provide
*/
public JCAProvider(Map<String, Class<? extends SaslClientFactory>> providerMap)
{
- super("AMQSASLProvider-Client", 1.0, "A JCA provider that registers all "
- + "AMQ SASL providers that want to be registered");
+ super(QPID_CLIENT_SASL_PROVIDER_NAME, QPID_CLIENT_SASL_PROVIDER_VERSION, QPID_CLIENT_SASL_PROVIDER_INFO);
register(providerMap);
}
@@ -63,7 +67,7 @@ public class JCAProvider extends Provide
for (Map.Entry<String, Class<? extends SaslClientFactory>> me : providerMap.entrySet())
{
put( "SaslClientFactory."+me.getKey(), me.getValue().getName());
- log.debug("Registered SASL Client factory for " + me.getKey() + " as " + me.getValue().getName());
+ log.debug("Recording SASL Client factory for " + me.getKey() + " as " + me.getValue().getName());
}
}
}
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Thu Feb 28 16:14:30 2013
@@ -157,12 +157,15 @@ public class AMQStateManager implements
if (_waiters.size() == 0)
{
- _logger.error("No Waiters for error saving as last error:" + error.getMessage());
+ _logger.info("No Waiters for error. Saving as last error:" + error.getMessage());
_lastException = error;
}
for (StateWaiter waiter : _waiters)
{
- _logger.error("Notifying Waiters(" + _waiters + ") for error:" + error.getMessage());
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Notifying waiter " + waiter + " for error:" + error.getMessage());
+ }
waiter.error(error);
}
}
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java Thu Feb 28 16:14:30 2013
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.client.transport;
+import org.apache.qpid.client.HeartbeatListener;
+import org.apache.qpid.transport.ConnectionHeartbeat;
import org.ietf.jgss.GSSContext;
import org.ietf.jgss.GSSException;
import org.ietf.jgss.GSSManager;
@@ -70,6 +72,7 @@ public class ClientConnectionDelegate ex
}
private final ConnectionURL _connectionURL;
+ private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT;
/**
* @param settings
@@ -165,4 +168,19 @@ public class ClientConnectionDelegate ex
return null;
}
+
+ @Override
+ public void connectionHeartbeat(Connection conn, ConnectionHeartbeat hearbeat)
+ {
+ // ClientDelegate simply responds to heartbeats with heartbeats
+ _heartbeatListener.heartbeatReceived();
+ super.connectionHeartbeat(conn, hearbeat);
+ _heartbeatListener.heartbeatSent();
+ }
+
+
+ public void setHeartbeatListener(HeartbeatListener listener)
+ {
+ _heartbeatListener = listener == null ? HeartbeatListener.DEFAULT : listener;
+ }
}
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java Thu Feb 28 16:14:30 2013
@@ -20,9 +20,8 @@
*/
package org.apache.qpid.jms;
-import org.apache.qpid.framing.AMQShortString;
-
import java.util.List;
+import org.apache.qpid.framing.AMQShortString;
/**
Connection URL format
@@ -35,14 +34,22 @@ public interface ConnectionURL
public static final String AMQ_PROTOCOL = "amqp";
public static final String OPTIONS_SYNC_PERSISTENCE = "sync_persistence";
public static final String OPTIONS_MAXPREFETCH = "maxprefetch";
- public static final String OPTIONS_SYNC_ACK = "sync_ack";
+ public static final String OPTIONS_SYNC_ACK = "sync_ack";
public static final String OPTIONS_SYNC_PUBLISH = "sync_publish";
public static final String OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT = "use_legacy_map_msg_format";
+ public static final String OPTIONS_USE_LEGACY_STREAM_MESSAGE_FORMAT = "use_legacy_stream_msg_format";
public static final String OPTIONS_BROKERLIST = "brokerlist";
public static final String OPTIONS_FAILOVER = "failover";
public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount";
/**
+ * This option is used to apply a connection level override of
+ * the {@value BrokerDetails#OPTIONS_SSL} option values in the
+ * {@value ConnectionURL#OPTIONS_BROKERLIST};
+ */
+ public static final String OPTIONS_SSL = "ssl";
+
+ /**
* This option is only applicable for 0-8/0-9/0-9-1 protocols connection
* <p>
* It tells the client to delegate the requeue/DLQ decision to the
@@ -54,9 +61,11 @@ public interface ConnectionURL
public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE = "defaultQueueExchange";
public static final String OPTIONS_TEMPORARY_TOPIC_EXCHANGE = "temporaryTopicExchange";
public static final String OPTIONS_TEMPORARY_QUEUE_EXCHANGE = "temporaryQueueExchange";
+ public static final String OPTIONS_VERIFY_QUEUE_ON_SEND = "verifyQueueOnSend";
+
public static final byte URL_0_8 = 1;
public static final byte URL_0_10 = 2;
-
+
String getURL();
String getFailoverMethod();
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java Thu Feb 28 16:14:30 2013
@@ -23,25 +23,11 @@ package org.apache.qpid.jms;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
-import java.io.UnsupportedEncodingException;
/**
*/
public interface MessageProducer extends javax.jms.MessageProducer
{
- /**
- * Set the default MIME type for messages produced by this producer. This reduces the overhead of each message.
- * @param mimeType
- */
- void setMimeType(String mimeType) throws JMSException;
-
- /**
- * Set the default encoding for messages produced by this producer. This reduces the overhead of each message.
- * @param encoding the encoding as understood by XXXX how do I specify this?? RG
- * @throws UnsupportedEncodingException if the encoding is not understood
- */
- void setEncoding(String encoding) throws UnsupportedEncodingException, JMSException;
-
void send(Destination destination, Message message, int deliveryMode,
int priority, long timeToLive, boolean immediate)
throws JMSException;
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/jms/Session.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/jms/Session.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/jms/Session.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/jms/Session.java Thu Feb 28 16:14:30 2013
@@ -21,6 +21,7 @@
package org.apache.qpid.jms;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.jms.ListMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -100,4 +101,6 @@ public interface Session extends TopicSe
AMQShortString getDefaultTopicExchangeName();
AMQShortString getTemporaryQueueExchangeName();
+
+ ListMessage createListMessage() throws JMSException;
}
Modified: qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java (original)
+++ qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java Thu Feb 28 16:14:30 2013
@@ -20,25 +20,60 @@
*/
package org.apache.qpid.client;
-import junit.framework.TestCase;
-
-import org.apache.qpid.AMQInvalidArgumentException;
+import java.util.concurrent.atomic.AtomicReference;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
-import java.util.concurrent.atomic.AtomicReference;
-public class AMQConnectionUnitTest extends TestCase
+import org.apache.qpid.AMQInvalidArgumentException;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class AMQConnectionUnitTest extends QpidTestCase
{
+ String _url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'";
+
+ public void testVerifyQueueOnSendDefault() throws Exception
+ {
+ MockAMQConnection connection = new MockAMQConnection(_url);
+ assertFalse(connection.validateQueueOnSend());
+ }
+
+ public void testVerifyQueueOnSendViaSystemProperty() throws Exception
+ {
+ setTestSystemProperty(ClientProperties.VERIFY_QUEUE_ON_SEND, "true");
+ MockAMQConnection connection = new MockAMQConnection(_url);
+ assertTrue(connection.validateQueueOnSend());
+
+ setTestSystemProperty(ClientProperties.VERIFY_QUEUE_ON_SEND, "false");
+ connection = new MockAMQConnection(_url);
+ assertFalse(connection.validateQueueOnSend());
+ }
+
+ public void testVerifyQueueOnSendViaURL() throws Exception
+ {
+ MockAMQConnection connection = new MockAMQConnection(_url + "&" + ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND + "='true'");
+ assertTrue(connection.validateQueueOnSend());
+
+ connection = new MockAMQConnection(_url + "&" + ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND + "='false'");
+ assertFalse(connection.validateQueueOnSend());
+ }
+
+ public void testVerifyQueueOnSendViaURLoverridesSystemProperty() throws Exception
+ {
+ setTestSystemProperty(ClientProperties.VERIFY_QUEUE_ON_SEND, "false");
+ MockAMQConnection connection = new MockAMQConnection(_url + "&" + ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND + "='true'");
+ assertTrue(connection.validateQueueOnSend());
+ }
public void testExceptionReceived()
{
- String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'";
AMQInvalidArgumentException expectedException = new AMQInvalidArgumentException("Test", null);
final AtomicReference<JMSException> receivedException = new AtomicReference<JMSException>();
try
{
- MockAMQConnection connection = new MockAMQConnection(url);
+ MockAMQConnection connection = new MockAMQConnection(_url);
connection.setExceptionListener(new ExceptionListener()
{
@@ -62,4 +97,22 @@ public class AMQConnectionUnitTest exten
assertEquals("JMSException linked exception is incorrect", expectedException, exception.getLinkedException());
}
+ /**
+ * This should expand to test all the defaults.
+ */
+ public void testDefaultStreamMessageEncoding() throws Exception
+ {
+ MockAMQConnection connection = new MockAMQConnection(_url);
+ assertTrue("Legacy Stream message encoding should be the default",connection.isUseLegacyStreamMessageFormat());
+ }
+
+ /**
+ * This should expand to test all the connection properties.
+ */
+ public void testStreamMessageEncodingProperty() throws Exception
+ {
+ MockAMQConnection connection = new MockAMQConnection(_url + "&use_legacy_stream_msg_format='false'");
+ assertFalse("Stream message encoding should be amqp/list",connection.isUseLegacyStreamMessageFormat());
+ }
+
}
Modified: qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java (original)
+++ qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java Thu Feb 28 16:14:30 2013
@@ -18,6 +18,7 @@
*/
package org.apache.qpid.client;
+import org.apache.qpid.client.message.AMQPEncodedListMessage;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.transport.*;
@@ -28,6 +29,8 @@ import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
+import javax.jms.StreamMessage;
+
import java.util.ArrayList;
import java.util.List;
@@ -276,7 +279,7 @@ public class AMQSession_0_10Test extends
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
null, null, false, true);
- session.sendConsume(consumer, new AMQShortString("test"), null, true, 1);
+ session.sendConsume(consumer, new AMQShortString("test"), true, 1);
}
catch (Exception e)
{
@@ -459,6 +462,13 @@ public class AMQSession_0_10Test extends
assertNotNull("ExchangeDeclare event was not sent", event);
}
+ public void testCreateStreamMessage() throws Exception
+ {
+ AMQSession_0_10 session = createAMQSession_0_10();
+ StreamMessage m = session.createStreamMessage();
+ assertTrue("Legacy Stream message encoding should be the default" + m.getClass(),!(m instanceof AMQPEncodedListMessage));
+ }
+
public void testGetQueueDepthWithSync()
{
// slow down a flush thread
@@ -587,7 +597,7 @@ public class AMQSession_0_10Test extends
connection.setSessionFactory(new SessionFactory()
{
- public Session newSession(Connection conn, Binary name, long expiry)
+ public Session newSession(Connection conn, Binary name, long expiry, boolean isNoReplay)
{
return new MockSession(conn, new SessionDelegate(), name, expiry, throwException);
}
@@ -660,7 +670,6 @@ public class AMQSession_0_10Test extends
if (m instanceof ExchangeBound)
{
ExchangeBoundResult struc = new ExchangeBoundResult();
- struc.setQueueNotFound(true);
result.setValue(struc);
}
else if (m instanceof ExchangeQuery)
Modified: qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java (original)
+++ qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java Thu Feb 28 16:14:30 2013
@@ -48,7 +48,7 @@ public class BasicMessageConsumer_0_8_Te
TestAMQSession testSession = new TestAMQSession(conn);
BasicMessageConsumer_0_8 consumer =
- new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
+ new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
assertEquals("Reject behaviour was was not as expected", RejectBehaviour.SERVER, consumer.getRejectBehaviour());
}
@@ -68,7 +68,7 @@ public class BasicMessageConsumer_0_8_Te
final TestAMQSession testSession = new TestAMQSession(conn);
final BasicMessageConsumer_0_8 consumer =
- new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
+ new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
assertEquals("Reject behaviour was was not as expected", RejectBehaviour.NORMAL, consumer.getRejectBehaviour());
}
@@ -94,7 +94,7 @@ public class BasicMessageConsumer_0_8_Te
TestAMQSession testSession = new TestAMQSession(conn);
BasicMessageConsumer_0_8 consumer =
- new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
+ new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
assertEquals("Reject behaviour was was not as expected", RejectBehaviour.NORMAL, consumer.getRejectBehaviour());
}
Modified: qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java (original)
+++ qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java Thu Feb 28 16:14:30 2013
@@ -120,6 +120,48 @@ public class BrokerDetailsTest extends T
{
assertTrue(urise.getReason().equals("Illegal character in port number"));
}
+ }
+
+ public void testToStringMasksKeyStorePassword() throws Exception
+ {
+ String url = "tcp://localhost:5672?key_store_password='password'";
+ BrokerDetails details = new AMQBrokerDetails(url);
+
+ String expectedToString = "tcp://localhost:5672?key_store_password='********'";
+ String actualToString = details.toString();
+
+ assertEquals("Unexpected toString", expectedToString, actualToString);
+ }
+
+ public void testToStringMasksTrustStorePassword() throws Exception
+ {
+ String url = "tcp://localhost:5672?trust_store_password='password'";
+ BrokerDetails details = new AMQBrokerDetails(url);
+
+ String expectedToString = "tcp://localhost:5672?trust_store_password='********'";
+ String actualToString = details.toString();
+
+ assertEquals("Unexpected toString", expectedToString, actualToString);
+ }
+
+ public void testDefaultSsl() throws URLSyntaxException
+ {
+ String brokerURL = "tcp://localhost:5672";
+ AMQBrokerDetails broker = new AMQBrokerDetails(brokerURL);
+
+ assertNull("default value should be null", broker.getProperty(BrokerDetails.OPTIONS_SSL));
+ }
+
+ public void testOverridingSsl() throws URLSyntaxException
+ {
+ String brokerURL = "tcp://localhost:5672?ssl='true'";
+ AMQBrokerDetails broker = new AMQBrokerDetails(brokerURL);
+
+ assertTrue("value should be true", Boolean.valueOf(broker.getProperty(BrokerDetails.OPTIONS_SSL)));
+
+ brokerURL = "tcp://localhost:5672?ssl='false''&maxprefetch='1'";
+ broker = new AMQBrokerDetails(brokerURL);
+ assertFalse("value should be false", Boolean.valueOf(broker.getProperty(BrokerDetails.OPTIONS_SSL)));
}
}
Modified: qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java (original)
+++ qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java Thu Feb 28 16:14:30 2013
@@ -30,7 +30,6 @@ import org.apache.qpid.url.URLSyntaxExce
public class ConnectionURLTest extends TestCase
{
-
public void testFailoverURL() throws URLSyntaxException
{
String url = "amqp://ritchiem:bob@/test?brokerlist='tcp://localhost:5672;tcp://fancyserver:3000/',failover='roundrobin?cyclecount='100''";
@@ -252,55 +251,47 @@ public class ConnectionURLTest extends T
assertTrue(service.getPort() == 5672);
}
- public void testSingleTransportDefaultedBrokerWithIPandPort() throws URLSyntaxException
+ public void testConnectionURLOptionToStringMasksPassword() throws URLSyntaxException
{
- String url = "amqp://guest:guest@/test?brokerlist='127.0.0.1:1234'";
+ String url = "amqp://guest:guest@client/localhost?brokerlist='tcp://localhost:1234'";
+ ConnectionURL connectionurl = new AMQConnectionURL(url);
+
+ String expectedToString = "amqp://guest:********@client/localhost?brokerlist='tcp://localhost:1234'";
+ String actualToString = connectionurl.toString();
+ assertEquals("Unexpected toString form", expectedToString, actualToString);
+ }
+
+ public void testConnectionURLOptionToStringMasksSslTrustStorePassword() throws URLSyntaxException
+ {
+ String url = "amqp://guest:guest@client/vhost?brokerlist='tcp://host:1234?trust_store_password='truststorepassword''";
+ ConnectionURL connectionurl = new AMQConnectionURL(url);
-// ConnectionURL connectionurl = new AMQConnectionURL(url);
-//
-// assertTrue(connectionurl.getFailoverMethod() == null);
-// assertTrue(connectionurl.getUsername().equals("guest"));
-// assertTrue(connectionurl.getPassword().equals("guest"));
-// assertTrue(connectionurl.getVirtualHost().equals("/temp"));
-//
-//
-// assertTrue(connectionurl.getBrokerCount() == 1);
-//
-// BrokerDetails service = connectionurl.getBrokerDetails(0);
-//
-// assertTrue(service.getTransport().equals("tcp"));
-//
-// assertTrue(service.getHost().equals("127.0.0.1"));
-// assertTrue(service.getPort() == 1234);
+ String expectedToString = "amqp://guest:********@client/vhost?brokerlist='tcp://host:1234?trust_store_password='********''";
+ String actualToString = connectionurl.toString();
+ assertEquals("Unexpected toString form", expectedToString, actualToString);
+ }
+
+ public void testConnectionURLOptionToStringMasksSslKeyStorePassword() throws URLSyntaxException
+ {
+ String url = "amqp://guest:guest@client/vhost?brokerlist='tcp://host:1234?key_store_password='keystorepassword1';tcp://host:1235?key_store_password='keystorepassword2''";
+ ConnectionURL connectionurl = new AMQConnectionURL(url);
+
+ String expectedToString = "amqp://guest:********@client/vhost?brokerlist='tcp://host:1234?key_store_password='********';tcp://host:1235?key_store_password='********''";
+ String actualToString = connectionurl.toString();
+ assertEquals("Unexpected toString form", expectedToString, actualToString);
}
/**
* Test for QPID-3662 to ensure the {@code toString()} representation is correct.
*/
- public void testConnectionURLOptionToString() throws URLSyntaxException
+ public void testConnectionURLOptionToStringWithMaxPreftech() throws URLSyntaxException
{
String url = "amqp://guest:guest@client/localhost?maxprefetch='1'&brokerlist='tcp://localhost:1234?tcp_nodelay='true''";
ConnectionURL connectionurl = new AMQConnectionURL(url);
- assertNull(connectionurl.getFailoverMethod());
- assertEquals("guest", connectionurl.getUsername());
- assertEquals("guest", connectionurl.getPassword());
- assertEquals("client", connectionurl.getClientName());
- assertEquals("/localhost", connectionurl.getVirtualHost());
- assertEquals("1", connectionurl.getOption("maxprefetch"));
- assertTrue(connectionurl.getBrokerCount() == 1);
-
- BrokerDetails service = connectionurl.getBrokerDetails(0);
- assertTrue(service.getTransport().equals("tcp"));
- assertTrue(service.getHost().equals("localhost"));
- assertTrue(service.getPort() == 1234);
- assertTrue(service.getProperties().containsKey("tcp_nodelay"));
- assertEquals("true", service.getProperties().get("tcp_nodelay"));
-
- String nopasswd = "amqp://guest:********@client/localhost?maxprefetch='1'&brokerlist='tcp://localhost:1234?tcp_nodelay='true''";
- String tostring = connectionurl.toString();
- assertEquals(tostring.indexOf("maxprefetch"), tostring.lastIndexOf("maxprefetch"));
- assertEquals(nopasswd, tostring);
+ String expectedToString = "amqp://guest:********@client/localhost?maxprefetch='1'&brokerlist='tcp://localhost:1234?tcp_nodelay='true''";
+ String actualToString = connectionurl.toString();
+ assertEquals("Unexpected toString form", expectedToString, actualToString);
}
public void testSingleTransportMultiOptionURL() throws URLSyntaxException
@@ -572,9 +563,64 @@ public class ConnectionURLTest extends T
connectionurl.getOption(ConnectionURL.OPTIONS_REJECT_BEHAVIOUR));
}
- public static junit.framework.Test suite()
+ /**
+ * Verify that when the ssl option is not specified, asking for the option returns null,
+ * such that this can later be used to verify it wasnt specified.
+ */
+ public void testDefaultSsl() throws URLSyntaxException
{
- return new junit.framework.TestSuite(ConnectionURLTest.class);
+ String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&foo='bar'";
+ ConnectionURL connectionURL = new AMQConnectionURL(url);
+
+ assertNull("default ssl value should be null", connectionURL.getOption(ConnectionURL.OPTIONS_SSL));
+ }
+
+ /**
+ * Verify that when the ssl option is specified, asking for the option returns the value,
+ * such that this can later be used to verify what value it was specified as.
+ */
+ public void testOverridingSsl() throws URLSyntaxException
+ {
+ String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&ssl='true'";
+ ConnectionURL connectionURL = new AMQConnectionURL(url);
+
+ assertTrue("value should be true", Boolean.valueOf(connectionURL.getOption(ConnectionURL.OPTIONS_SSL)));
+
+ url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&ssl='false'";
+ connectionURL = new AMQConnectionURL(url);
+
+ assertFalse("value should be false", Boolean.valueOf(connectionURL.getOption(ConnectionURL.OPTIONS_SSL)));
+ }
+
+ /**
+ * Verify that when the {@value ConnectionURL#OPTIONS_VERIFY_QUEUE_ON_SEND} option is not
+ * specified, asking for the option returns null, such that this can later be used to
+ * verify it wasn't specified.
+ */
+ public void testDefaultVerifyQueueOnSend() throws URLSyntaxException
+ {
+ String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&foo='bar'";
+ ConnectionURL connectionURL = new AMQConnectionURL(url);
+
+ assertNull("default ssl value should be null", connectionURL.getOption(ConnectionURL.OPTIONS_SSL));
+ }
+
+ /**
+ * Verify that when the {@value ConnectionURL#OPTIONS_VERIFY_QUEUE_ON_SEND} option is
+ * specified, asking for the option returns the value, such that this can later be used
+ * to verify what value it was specified as.
+ */
+ public void testOverridingVerifyQueueOnSend() throws URLSyntaxException
+ {
+ String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&verifyQueueOnSend='true'";
+ ConnectionURL connectionURL = new AMQConnectionURL(url);
+
+ assertTrue("value should be true", Boolean.valueOf(connectionURL.getOption(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND)));
+
+ url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&verifyQueueOnSend='false'";
+ connectionURL = new AMQConnectionURL(url);
+
+ assertFalse("value should be false", Boolean.valueOf(connectionURL.getOption(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND)));
}
}
Modified: qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java (original)
+++ qpid/branches/asyncstore/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java Thu Feb 28 16:14:30 2013
@@ -193,6 +193,126 @@ public class DestinationURLTest extends
assertTrue(dest.getQueueName().equals("test:testQueueD"));
}
+ public void testExchangeOptionsNotPresent() throws URISyntaxException
+ {
+ String url = "exchangeClass://exchangeName/Destination/Queue";
+
+ AMQBindingURL burl = new AMQBindingURL(url);
+
+ assertTrue(url.equals(burl.toString()));
+
+ assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_DURABLE));
+ assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_AUTODELETE));
+ assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_INTERNAL));
+
+ class MyTestAMQDestination extends AMQDestination
+ {
+ public MyTestAMQDestination(BindingURL url)
+ {
+ super(url);
+ }
+ public boolean isNameRequired()
+ {
+ return false;
+ }
+ };
+
+ AMQDestination dest = new MyTestAMQDestination(burl);
+ assertFalse(dest.isExchangeAutoDelete());
+ assertFalse(dest.isExchangeDurable());
+ assertFalse(dest.isExchangeInternal());
+ }
+
+ public void testExchangeAutoDeleteOptionPresent() throws URISyntaxException
+ {
+ String url = "exchangeClass://exchangeName/Destination/Queue?" + BindingURL.OPTION_EXCHANGE_AUTODELETE + "='true'";
+
+ AMQBindingURL burl = new AMQBindingURL(url);
+
+ assertTrue(url.equals(burl.toString()));
+
+ assertEquals("true", burl.getOption(BindingURL.OPTION_EXCHANGE_AUTODELETE));
+ assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_DURABLE));
+ assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_INTERNAL));
+
+ class MyTestAMQDestination extends AMQDestination
+ {
+ public MyTestAMQDestination(BindingURL url)
+ {
+ super(url);
+ }
+ public boolean isNameRequired()
+ {
+ return false;
+ }
+ };
+
+ AMQDestination dest = new MyTestAMQDestination(burl);
+ assertTrue(dest.isExchangeAutoDelete());
+ assertFalse(dest.isExchangeDurable());
+ assertFalse(dest.isExchangeInternal());
+ }
+
+ public void testExchangeDurableOptionPresent() throws URISyntaxException
+ {
+ String url = "exchangeClass://exchangeName/Destination/Queue?" + BindingURL.OPTION_EXCHANGE_DURABLE + "='true'";
+
+ AMQBindingURL burl = new AMQBindingURL(url);
+
+ assertTrue(url.equals(burl.toString()));
+
+ assertEquals("true", burl.getOption(BindingURL.OPTION_EXCHANGE_DURABLE));
+ assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_AUTODELETE));
+ assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_INTERNAL));
+
+ class MyTestAMQDestination extends AMQDestination
+ {
+ public MyTestAMQDestination(BindingURL url)
+ {
+ super(url);
+ }
+ public boolean isNameRequired()
+ {
+ return false;
+ }
+ };
+
+ AMQDestination dest = new MyTestAMQDestination(burl);
+ assertTrue(dest.isExchangeDurable());
+ assertFalse(dest.isExchangeAutoDelete());
+ assertFalse(dest.isExchangeInternal());
+ }
+
+ public void testExchangeInternalOptionPresent() throws URISyntaxException
+ {
+ String url = "exchangeClass://exchangeName/Destination/Queue?" + BindingURL.OPTION_EXCHANGE_INTERNAL + "='true'";
+
+ AMQBindingURL burl = new AMQBindingURL(url);
+
+ assertTrue(url.equals(burl.toString()));
+
+ assertEquals("true", burl.getOption(BindingURL.OPTION_EXCHANGE_INTERNAL));
+ assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_AUTODELETE));
+ assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_DURABLE));
+
+ class MyTestAMQDestination extends AMQDestination
+ {
+ public MyTestAMQDestination(BindingURL url)
+ {
+ super(url);
+ }
+ public boolean isNameRequired()
+ {
+ return false;
+ }
+ };
+
+ AMQDestination dest = new MyTestAMQDestination(burl);
+ assertTrue(dest.isExchangeInternal());
+ assertFalse(dest.isExchangeDurable());
+ assertFalse(dest.isExchangeAutoDelete());
+ }
+
public void testRejectBehaviourPresent() throws URISyntaxException
{
String url = "exchangeClass://exchangeName/Destination/Queue?rejectbehaviour='server'";
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org