You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/02/28 17:12:09 UTC
svn commit: r632040 [2/2] - in /incubator/qpid/branches/thegreatmerge: ./
qpid/cpp/ qpid/cpp/rubygen/ qpid/cpp/rubygen/0-10/ qpid/cpp/rubygen/99-0/
qpid/cpp/rubygen/templates/ qpid/cpp/src/ qpid/cpp/src/qpid/
qpid/cpp/src/qpid/amqp_0_10/ qpid/cpp/src/q...
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java Thu Feb 28 08:11:52 2008
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.client;
+import java.net.URISyntaxException;
+
import javax.jms.Destination;
import javax.naming.NamingException;
import javax.naming.Reference;
@@ -31,7 +33,6 @@
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.url.URLHelper;
-import org.apache.qpid.url.URLSyntaxException;
public abstract class AMQDestination implements Destination, Referenceable
@@ -50,6 +51,8 @@
private AMQShortString _routingKey;
+ private AMQShortString[] _bindingKeys;
+
private String _url;
private AMQShortString _urlAsShortString;
@@ -64,7 +67,7 @@
public static final Integer TOPIC_TYPE = Integer.valueOf(2);
public static final Integer UNKNOWN_TYPE = Integer.valueOf(3);
- protected AMQDestination(String url) throws URLSyntaxException
+ protected AMQDestination(String url) throws URISyntaxException
{
this(new AMQBindingURL(url));
}
@@ -79,26 +82,43 @@
_isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE));
_queueName = binding.getQueueName() == null ? null : new AMQShortString(binding.getQueueName());
_routingKey = binding.getRoutingKey() == null ? null : new AMQShortString(binding.getRoutingKey());
+ _bindingKeys = binding.getBindingKeys() == null || binding.getBindingKeys().length == 0 ? new AMQShortString[0] : binding.getBindingKeys();
}
protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, AMQShortString queueName)
{
- this(exchangeName, exchangeClass, routingKey, false, false, queueName);
+ this(exchangeName, exchangeClass, routingKey, false, false, queueName, null);
+ }
+
+ protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, AMQShortString queueName, AMQShortString[] bindingKeys)
+ {
+ this(exchangeName, exchangeClass, routingKey, false, false, queueName,bindingKeys);
}
protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName)
{
- this(exchangeName, exchangeClass, destinationName, false, false, null);
+ this(exchangeName, exchangeClass, destinationName, false, false, null,null);
+ }
+
+ protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
+ boolean isAutoDelete, AMQShortString queueName)
+ {
+ this(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, false,null);
}
protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
- boolean isAutoDelete, AMQShortString queueName)
+ boolean isAutoDelete, AMQShortString queueName,AMQShortString[] bindingKeys)
{
- this(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, false);
+ this(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, false,bindingKeys);
+ }
+
+ protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
+ boolean isAutoDelete, AMQShortString queueName, boolean isDurable){
+ this (exchangeName, exchangeClass, routingKey, isExclusive,isAutoDelete,queueName,isDurable,null);
}
protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
- boolean isAutoDelete, AMQShortString queueName, boolean isDurable)
+ boolean isAutoDelete, AMQShortString queueName, boolean isDurable,AMQShortString[] bindingKeys)
{
// If used with a fannout exchange, the routing key can be null
if ( !ExchangeDefaults.FANOUT_EXCHANGE_CLASS.equals(exchangeClass) && routingKey == null)
@@ -120,6 +140,7 @@
_isAutoDelete = isAutoDelete;
_queueName = queueName;
_isDurable = isDurable;
+ _bindingKeys = bindingKeys == null || bindingKeys.length == 0 ? new AMQShortString[0] : bindingKeys;
}
public AMQShortString getEncodedName()
@@ -181,6 +202,20 @@
return _routingKey;
}
+ public AMQShortString[] getBindingKeys()
+ {
+ if (_bindingKeys != null && _bindingKeys.length > 0)
+ {
+ return _bindingKeys;
+ }
+ else
+ {
+ // catering to the common use case where the
+ //routingKey is the same as the bindingKey.
+ return new AMQShortString[]{_routingKey};
+ }
+ }
+
public boolean isExclusive()
{
return _isExclusive;
@@ -237,6 +272,21 @@
sb.append("='");
sb.append(_routingKey).append("'");
sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ }
+
+ // We can't allow both routingKey and bindingKey
+ if (_routingKey == null && _bindingKeys != null && _bindingKeys.length>0)
+ {
+
+ for (AMQShortString bindingKey:_bindingKeys)
+ {
+ sb.append(BindingURL.OPTION_BINDING_KEY);
+ sb.append("='");
+ sb.append(bindingKey);
+ sb.append("'");
+ sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+
+ }
}
if (_isDurable)
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java Thu Feb 28 08:11:52 2008
@@ -61,8 +61,14 @@
public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName)
{
super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, false,
- false, queueName, false); }
+ false, queueName, false);
+ }
+ public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName,AMQShortString[] bindingKeys)
+ {
+ super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, false,
+ false, queueName, false,bindingKeys);
+ }
/**
* Create a reference to a non temporary queue. Note this does not actually imply the queue exists.
@@ -126,11 +132,15 @@
this(exchangeName, routingKey, queueName, exclusive, autoDelete, false);
}
-
public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable)
{
+ this(exchangeName,routingKey,queueName,exclusive,autoDelete,durable,null);
+ }
+
+ public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable,AMQShortString[] bindingKeys)
+ {
super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, exclusive,
- autoDelete, queueName, durable);
+ autoDelete, queueName, durable, bindingKeys);
}
public AMQShortString getRoutingKey()
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Feb 28 08:11:52 2008
@@ -21,6 +21,7 @@
package org.apache.qpid.client;
import java.io.Serializable;
+import java.net.URISyntaxException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
@@ -86,7 +87,6 @@
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.url.AMQBindingURL;
-import org.apache.qpid.url.URLSyntaxException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -564,19 +564,14 @@
* @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges?
*/
public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
- final AMQShortString exchangeName) throws AMQException
+ final AMQShortString exchangeName,final AMQDestination destination) throws AMQException
{
/*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/
new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
{
public Object execute() throws AMQException, FailoverException
{
- QueueBindBody body = getMethodRegistry().createQueueBindBody(getTicket(),queueName,exchangeName,routingKey,false,arguments);
-
- AMQFrame queueBind = body.generateFrame(_channelId);
-
- getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class);
-
+ sendQueueBind(queueName,routingKey,arguments,exchangeName,destination);
return null;
}
}, _connection).execute();
@@ -587,12 +582,12 @@
{
if( consumer.getQueuename() != null)
{
- bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName());
+ bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName(),amqd);
}
}
public abstract void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
- final AMQShortString exchangeName) throws AMQException, FailoverException;
+ final AMQShortString exchangeName,AMQDestination destination) throws AMQException, FailoverException;
/**
@@ -1036,7 +1031,7 @@
{
return new AMQQueue(new AMQBindingURL(queueName));
}
- catch (URLSyntaxException urlse)
+ catch (URISyntaxException urlse)
{
JMSException jmse = new JMSException(urlse.getReason());
jmse.setLinkedException(urlse);
@@ -1253,7 +1248,7 @@
{
return new AMQTopic(new AMQBindingURL(topicName));
}
- catch (URLSyntaxException urlse)
+ catch (URISyntaxException urlse)
{
JMSException jmse = new JMSException(urlse.getReason());
jmse.setLinkedException(urlse);
@@ -1380,6 +1375,16 @@
}
}
+ public void declareAndBind(AMQDestination amqd)
+ throws
+ AMQException
+ {
+ AMQProtocolHandler protocolHandler = getProtocolHandler();
+ declareExchange(amqd, protocolHandler, false);
+ AMQShortString queueName = declareQueue(amqd, protocolHandler);
+ bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(),amqd);
+ }
+
/**
* Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message.
*
@@ -1820,35 +1825,10 @@
*
* @todo Be aware of possible changes to parameter order as versions change.
*/
- boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
- throws JMSException
- {
- try
- {
- AMQMethodEvent response =
- new FailoverRetrySupport<AMQMethodEvent, AMQException>(
- new FailoverProtectedOperation<AMQMethodEvent, AMQException>()
- {
- public AMQMethodEvent execute() throws AMQException, FailoverException
- {
- ExchangeBoundBody body = getMethodRegistry().createExchangeBoundBody(exchangeName, routingKey, queueName);
- AMQFrame boundFrame = body.generateFrame(_channelId);
-
- return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
-
- }
- }, _connection).execute();
-
- // Extract and return the response code from the query.
- ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
-
- return (responseBody.getReplyCode() == 0);
- }
- catch (AMQException e)
- {
- throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), e);
- }
- }
+ public abstract boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
+ throws JMSException;
+
+ public abstract boolean isQueueBound(final AMQDestination destination) throws JMSException;
/**
* Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover
@@ -2509,16 +2489,6 @@
}
}
- public void declareAndBind(AMQDestination amqd)
- throws
- AMQException
- {
- AMQProtocolHandler protocolHandler = getProtocolHandler();
- declareExchange(amqd, protocolHandler, false);
- AMQShortString queueName = declareQueue(amqd, protocolHandler);
- bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName());
- }
-
/**
* Callers must hold the failover mutex before calling this method.
*
@@ -2540,7 +2510,7 @@
consumer.setQueuename(queueName);
// bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
- bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName());
+ bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName(),amqd);
// If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch
if (!_immediatePrefetch)
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Thu Feb 28 08:11:52 2008
@@ -41,6 +41,7 @@
import javax.jms.*;
import javax.jms.IllegalStateException;
+
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.UUID;
import java.util.Map;
@@ -159,7 +160,7 @@
AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
-
+
_subscriptions.put(name, subscriber);
_reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
@@ -213,7 +214,7 @@
* @param arguments 0_8 specific
*/
public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey,
- final FieldTable arguments, final AMQShortString exchangeName)
+ final FieldTable arguments, final AMQShortString exchangeName, final AMQDestination destination)
throws AMQException, FailoverException
{
Map args = FiledTableSupport.convertToMap(arguments);
@@ -222,7 +223,12 @@
{
args.put("x-match", "any");
}
- getQpidSession().queueBind(queueName.toString(), exchangeName.toString(), routingKey.toString(), args);
+
+ for (AMQShortString rk: destination.getBindingKeys())
+ {
+ _logger.debug("Binding queue : " + queueName.toString() + " exchange: " + exchangeName.toString() + " using binding key " + rk.asString());
+ getQpidSession().queueBind(queueName.toString(), exchangeName.toString(), rk.toString(), args);
+ }
// We need to sync so that we get notify of an error.
getQpidSession().sync();
getCurrentException();
@@ -238,6 +244,7 @@
*/
public void sendClose(long timeout) throws AMQException, FailoverException
{
+ getQpidSession().sync();
getQpidSession().sessionClose();
getCurrentException();
}
@@ -350,19 +357,37 @@
/**
* Bind a queue with an exchange.
*/
- public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName,
- final AMQShortString routingKey) throws JMSException
+
+ public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
+ throws JMSException
+ {
+ return isQueueBound(exchangeName,queueName,routingKey,null);
+ }
+
+ public boolean isQueueBound(final AMQDestination destination) throws JMSException
+ {
+ return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getRoutingKey(),destination.getBindingKeys());
+ }
+
+ public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey,AMQShortString[] bindingKeys)
+ throws JMSException
{
String rk = "";
boolean res;
- if (routingKey != null)
+ if (bindingKeys != null && bindingKeys.length>0)
+ {
+ rk = bindingKeys[0].toString();
+ }
+ else if (routingKey != null)
{
rk = routingKey.toString();
}
+
Future<BindingQueryResult> result =
- getQpidSession().bindingQuery(exchangeName.toString(), queueName.toString(), rk, null);
+ getQpidSession().bindingQuery(exchangeName.toString(),queueName.toString(), rk, null);
BindingQueryResult bindingQueryResult = result.get();
- if (routingKey == null)
+
+ if (rk == null)
{
res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getQueueNotFound());
}
@@ -577,7 +602,7 @@
{
// this is done so that we can produce to a temporary queue beofre we create a consumer
sendCreateQueue(result.getRoutingKey(), result.isAutoDelete(), result.isDurable(), result.isExclusive());
- sendQueueBind(result.getRoutingKey(), result.getRoutingKey(), new FieldTable(), result.getExchangeName());
+ sendQueueBind(result.getRoutingKey(), result.getRoutingKey(), new FieldTable(), result.getExchangeName(),result);
result.setQueueName(result.getRoutingKey());
}
catch (Exception e)
@@ -701,7 +726,7 @@
AMQShortString topicName;
if (topic instanceof AMQTopic)
{
- topicName=((AMQTopic) topic).getRoutingKey();
+ topicName=((AMQTopic) topic).getBindingKeys()[0];
}
else
{
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Thu Feb 28 08:11:52 2008
@@ -94,7 +94,7 @@
}
public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
- final AMQShortString exchangeName) throws AMQException, FailoverException
+ final AMQShortString exchangeName, final AMQDestination dest) throws AMQException, FailoverException
{
getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody
(getTicket(),queueName,exchangeName,routingKey,false,arguments).
@@ -169,6 +169,11 @@
_connection.getProtocolHandler().writeFrame(basicRejectBody);
}
+ }
+
+ public boolean isQueueBound(final AMQDestination destination) throws JMSException
+ {
+ return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getAMQQueueName());
}
public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java Thu Feb 28 08:11:52 2008
@@ -49,6 +49,11 @@
super(exchange, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, routingKey, true, true, queueName, false);
}
+ public AMQTopic(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName,AMQShortString[] bindingKeys)
+ {
+ super(exchange, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, routingKey, true, true, queueName, false,bindingKeys);
+ }
+
public AMQTopic(AMQConnection conn, String routingKey)
{
this(conn.getDefaultTopicExchangeName(), new AMQShortString(routingKey));
@@ -77,6 +82,11 @@
super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable );
}
+ protected AMQTopic(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
+ boolean isAutoDelete, AMQShortString queueName, boolean isDurable,AMQShortString[] bindingKeys)
+ {
+ super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable,bindingKeys);
+ }
public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection)
throws JMSException
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Thu Feb 28 08:11:52 2008
@@ -51,7 +51,7 @@
/** The connection being used by this consumer */
protected final AMQConnection _connection;
- private final String _messageSelector;
+ protected final String _messageSelector;
private final boolean _noLocal;
@@ -740,6 +740,8 @@
}
else
{
+ // we should not be allowed to add a message is the
+ // consumer is closed
_synchronousQueue.put(jmsMessage);
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Thu Feb 28 08:11:52 2008
@@ -38,7 +38,6 @@
import javax.jms.MessageListener;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.Iterator;
/**
@@ -122,8 +121,10 @@
}
catch (AMQException e)
{
+ _logger.error("Receivecd an Exception when receiving message",e);
try
{
+
getSession().getAMQConnection().getExceptionListener()
.onException(new JMSAMQException("Error when receiving message", e));
}
@@ -135,6 +136,7 @@
}
if (messageOk)
{
+ _logger.debug("messageOk, trying to notify");
super.notifyMessage(jmsMessage);
}
}
@@ -290,7 +292,7 @@
// TODO Use a tag for fiding out if message filtering is done here or by the broker.
try
{
- if (getMessageSelector() != null && !getMessageSelector().equals(""))
+ if (_messageSelector != null && !_messageSelector.equals(""))
{
messageOk = _filter.matches(message);
}
@@ -332,6 +334,9 @@
_logger.debug("filterMessage - trying to acquire message");
}
messageOk = acquireMessage(message);
+ _logger.debug("filterMessage - *************************************");
+ _logger.debug("filterMessage - message acquire status : " + messageOk);
+ _logger.debug("filterMessage - *************************************");
}
return messageOk;
}
@@ -393,13 +398,29 @@
_0_10session.getQpidSession()
.messageAcquire(ranges, org.apache.qpidity.nclient.Session.MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE);
+
+ _logger.debug("acquireMessage, sent acquire message to broker");
+
_0_10session.getQpidSession().sync();
+
+ _logger.debug("acquireMessage, returned from sync");
+
RangeSet acquired = _0_10session.getQpidSession().getAccquiredMessages();
+
+ _logger.debug("acquireMessage, acquired range set " + acquired);
+
if (acquired != null && acquired.size() > 0)
{
result = true;
}
+
+ _logger.debug("acquireMessage, Trying to get current exception ");
+
_0_10session.getCurrentException();
+
+ _logger.debug("acquireMessage, returned from getting current exception ");
+
+ _logger.debug("acquireMessage, acquired range set " + acquired + " now returning " );
}
return result;
}
@@ -473,4 +494,4 @@
_session.rejectMessage(message, true);
}
}
-}
\ No newline at end of file
+}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Thu Feb 28 08:11:52 2008
@@ -17,22 +17,22 @@
*/
package org.apache.qpid.client;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.FiledTableSupport;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.url.AMQBindingURL;
-import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpidity.njms.ExceptionHelper;
import org.apache.qpidity.nclient.util.ByteBufferMessage;
-import org.apache.qpidity.transport.ReplyTo;
+import org.apache.qpidity.njms.ExceptionHelper;
import org.apache.qpidity.transport.DeliveryProperties;
-
-import javax.jms.Message;
-import javax.jms.JMSException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
+import org.apache.qpidity.transport.ReplyTo;
/**
* This is a 0_10 message producer.
@@ -154,12 +154,20 @@
String replyToURL = contentHeaderProperties.getReplyToAsString();
if (replyToURL != null)
{
+ if(_logger.isDebugEnabled())
+ {
+ StringBuffer b = new StringBuffer();
+ b.append("\n==========================");
+ b.append("\nReplyTo : " + replyToURL);
+ b.append("\n==========================");
+ _logger.debug(b.toString());
+ }
AMQBindingURL dest;
try
{
dest = new AMQBindingURL(replyToURL);
}
- catch (URLSyntaxException e)
+ catch (URISyntaxException e)
{
throw ExceptionHelper.convertQpidExceptionToJMSException(e);
}
@@ -198,8 +206,7 @@
public boolean isBound(AMQDestination destination) throws JMSException
{
- return _session.isQueueBound(destination.getExchangeName(), destination.getAMQQueueName(),
- destination.getRoutingKey());
+ return _session.isQueueBound(destination);
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Thu Feb 28 08:11:52 2008
@@ -20,30 +20,34 @@
*/
package org.apache.qpid.client.message;
-import org.apache.commons.collections.map.ReferenceMap;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.Map;
+import java.util.UUID;
-import org.apache.mina.common.ByteBuffer;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageNotReadableException;
+import javax.jms.MessageNotWriteableException;
+import org.apache.commons.collections.map.ReferenceMap;
+import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.*;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.AMQUndefinedDestination;
+import org.apache.qpid.client.BasicMessageConsumer;
+import org.apache.qpid.client.CustomJMSXProperty;
+import org.apache.qpid.client.JMSAMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
-import org.apache.qpid.url.URLSyntaxException;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageNotReadableException;
-import javax.jms.MessageNotWriteableException;
-
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.Map;
-import java.util.UUID;
-import java.io.IOException;
-import java.net.URISyntaxException;
public abstract class AbstractJMSMessage extends AMQMessage implements org.apache.qpid.jms.Message
{
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Thu Feb 28 08:11:52 2008
@@ -140,8 +140,7 @@
props.setType(mprop.getType());
props.setUserId(mprop.getUserId());
props.setHeaders(FiledTableSupport.convertToFieldTable(mprop.getApplicationHeaders()));
- AbstractJMSMessage message = createMessage(messageNbr, data, exchange, routingKey, props);
- message.receivedFromServer();
+ AbstractJMSMessage message = createMessage(messageNbr, data, exchange, routingKey, props);
return message;
}
@@ -152,7 +151,7 @@
{
final AbstractJMSMessage msg = create08MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies);
msg.setJMSRedelivered(redelivered);
-
+ msg.receivedFromServer();
return msg;
}
@@ -164,7 +163,7 @@
final AbstractJMSMessage msg =
create010MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies, replyToURL);
msg.setJMSRedelivered(redelivered);
-
+ msg.receivedFromServer();
return msg;
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java Thu Feb 28 08:11:52 2008
@@ -20,6 +20,24 @@
*/
package org.apache.qpid.jndi;
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Queue;
+import javax.jms.Topic;
+import javax.naming.Context;
+import javax.naming.NamingException;
+import javax.naming.spi.InitialContextFactory;
+
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQHeadersExchange;
@@ -30,28 +48,9 @@
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.url.URLSyntaxException;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.Queue;
-import javax.jms.Topic;
-import javax.naming.Context;
-import javax.naming.NamingException;
-import javax.naming.spi.InitialContextFactory;
-
-import java.io.BufferedInputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.Hashtable;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-
public class PropertiesFileInitialContextFactory implements InitialContextFactory
{
protected final Logger _logger = LoggerFactory.getLogger(PropertiesFileInitialContextFactory.class);
@@ -184,6 +183,17 @@
Topic t = createTopic(entry.getValue().toString());
if (t != null)
{
+ if (_logger.isDebugEnabled())
+ {
+ StringBuffer b = new StringBuffer();
+ b.append("Creating the topic: " + jndiName + " with the following binding keys ");
+ for (AMQShortString binding:((AMQTopic)t).getBindingKeys())
+ {
+ b.append(binding.toString()).append(",");
+ }
+
+ _logger.debug(b.toString());
+ }
data.put(jndiName, t);
}
}
@@ -219,7 +229,7 @@
}
catch (URISyntaxException urlse)
{
- _logger.warn("Unable to destination:" + urlse);
+ _logger.warn("Unable to create destination:" + urlse, urlse);
return null;
}
@@ -268,7 +278,17 @@
}
else if (value instanceof String)
{
- return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, new AMQShortString((String) value));
+ String[] keys = ((String)value).split(",");
+ AMQShortString[] bindings = new AMQShortString[keys.length];
+ int i = 0;
+ for (String key:keys)
+ {
+ bindings[i] = new AMQShortString(key);
+ i++;
+ }
+ // The Destination has a dual nature. If this was used for a producer the key is used
+ // for the routing key. If it was used for the consumer it becomes the bindingKey
+ return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME,bindings[0],null,bindings);
}
else if (value instanceof BindingURL)
{
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java Thu Feb 28 08:11:52 2008
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,12 +23,18 @@
import junit.framework.TestCase;
import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.test.unit.basic.PropertyValueTest;
import org.apache.qpid.url.AMQBindingURL;
-import org.apache.qpid.url.URLSyntaxException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URISyntaxException;
public class DestinationURLTest extends TestCase
{
- public void testFullURL() throws URLSyntaxException
+ private static final Logger _logger = LoggerFactory.getLogger(DestinationURLTest.class);
+
+ public void testFullURL() throws URISyntaxException
{
String url = "exchange.Class://exchangeName/Destination/Queue";
@@ -43,7 +49,7 @@
assertTrue(dest.getQueueName().equals("Queue"));
}
- public void testQueue() throws URLSyntaxException
+ public void testQueue() throws URISyntaxException
{
String url = "exchangeClass://exchangeName//Queue";
@@ -58,7 +64,7 @@
assertTrue(dest.getQueueName().equals("Queue"));
}
- public void testQueueWithOption() throws URLSyntaxException
+ public void testQueueWithOption() throws URISyntaxException
{
String url = "exchangeClass://exchangeName//Queue?option='value'";
@@ -75,7 +81,7 @@
}
- public void testDestination() throws URLSyntaxException
+ public void testDestination() throws URISyntaxException
{
String url = "exchangeClass://exchangeName/Destination/";
@@ -90,7 +96,7 @@
assertTrue(dest.getQueueName().equals(""));
}
- public void testDestinationWithOption() throws URLSyntaxException
+ public void testDestinationWithOption() throws URISyntaxException
{
String url = "exchangeClass://exchangeName/Destination/?option='value'";
@@ -107,7 +113,7 @@
assertTrue(dest.getOption("option").equals("value"));
}
- public void testDestinationWithMultiOption() throws URLSyntaxException
+ public void testDestinationWithMultiOption() throws URISyntaxException
{
String url = "exchangeClass://exchangeName/Destination/?option='value',option2='value2'";
@@ -123,7 +129,7 @@
assertTrue(dest.getOption("option2").equals("value2"));
}
- public void testDestinationWithNoExchangeDefaultsToDirect() throws URLSyntaxException
+ public void testDestinationWithNoExchangeDefaultsToDirect() throws URISyntaxException
{
String url = "IBMPerfQueue1?durable='true'";
@@ -136,6 +142,41 @@
assertTrue(dest.getQueueName().equals("IBMPerfQueue1"));
assertTrue(dest.getOption("durable").equals("true"));
+ }
+
+ public void testDestinationWithMultiBindingKeys() throws URISyntaxException
+ {
+
+ String url = "exchangeClass://exchangeName/Destination/?bindingkey='key1',bindingkey='key2'";
+
+ AMQBindingURL dest = new AMQBindingURL(url);
+
+ assertTrue(dest.getExchangeClass().equals("exchangeClass"));
+ assertTrue(dest.getExchangeName().equals("exchangeName"));
+ assertTrue(dest.getDestinationName().equals("Destination"));
+ assertTrue(dest.getQueueName().equals(""));
+
+ assertTrue(dest.getBindingKeys().length == 2);
+ }
+
+ // You can only specify only a routing key or binding key, but not both.
+ public void testDestinationIfOnlyRoutingKeyOrBindingKeyIsSpecified() throws URISyntaxException
+ {
+
+ String url = "exchangeClass://exchangeName/Destination/?bindingkey='key1',routingkey='key2'";
+ boolean exceptionThrown = false;
+ try
+ {
+
+ AMQBindingURL dest = new AMQBindingURL(url);
+ }
+ catch(URISyntaxException e)
+ {
+ exceptionThrown = true;
+ _logger.info("Exception thrown",e);
+ }
+
+ assertTrue("Failed to throw an URISyntaxException when both the bindingkey and routingkey is specified",exceptionThrown);
}
public static junit.framework.Test suite()
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java Thu Feb 28 08:11:52 2008
@@ -20,28 +20,29 @@
*/
package org.apache.qpid.url;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.HashMap;
-
public class AMQBindingURL implements BindingURL
{
private static final Logger _logger = LoggerFactory.getLogger(AMQBindingURL.class);
String _url;
- AMQShortString _exchangeClass;
- AMQShortString _exchangeName;
- AMQShortString _destinationName;
- AMQShortString _queueName;
+ AMQShortString _exchangeClass = ExchangeDefaults.DIRECT_EXCHANGE_CLASS;
+ AMQShortString _exchangeName = new AMQShortString("");
+ AMQShortString _destinationName = new AMQShortString("");;
+ AMQShortString _queueName = new AMQShortString("");
+ AMQShortString[] _bindingKeys = new AMQShortString[0];
private HashMap<String, String> _options;
- public AMQBindingURL(String url) throws URLSyntaxException
+ public AMQBindingURL(String url) throws URISyntaxException
{
// format:
// <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
@@ -52,116 +53,35 @@
parseBindingURL();
}
- private void parseBindingURL() throws URLSyntaxException
+ private void parseBindingURL() throws URISyntaxException
{
- try
- {
- URI connection = new URI(_url);
-
- String exchangeClass = connection.getScheme();
-
- if (exchangeClass == null)
- {
- _url = ExchangeDefaults.DIRECT_EXCHANGE_CLASS + "://" + "" + "//" + _url;
- // URLHelper.parseError(-1, "Exchange Class not specified.", _url);
- parseBindingURL();
-
- return;
- }
- else
- {
- setExchangeClass(exchangeClass);
- }
-
- String exchangeName = connection.getHost();
-
- if (exchangeName == null)
- {
- if (getExchangeClass().equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
- {
- setExchangeName("");
- }
- else
- {
- throw URLHelper.parseError(-1, "Exchange Name not specified.", _url);
- }
- }
- else
- {
- setExchangeName(exchangeName);
- }
-
- String queueName;
-
- if ((connection.getPath() == null) || connection.getPath().equals(""))
- {
- throw URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(),
- "Destination or Queue requried", _url);
- }
- else
- {
- int slash = connection.getPath().indexOf("/", 1);
- if (slash == -1)
- {
- throw URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(),
- "Destination requried", _url);
- }
- else
- {
- String path = connection.getPath();
- setDestinationName(path.substring(1, slash));
-
- // We don't set queueName yet as the actual value we use depends on options set
- // when we are dealing with durable subscriptions
-
- queueName = path.substring(slash + 1);
-
- }
- }
-
- URLHelper.parseOptions(_options, connection.getQuery());
-
- processOptions();
-
- // We can now call setQueueName as the URL is full parsed.
-
- setQueueName(queueName);
-
- // Fragment is #string (not used)
- _logger.debug("URL Parsed: " + this);
-
- }
- catch (URISyntaxException uris)
- {
-
- throw URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
-
- }
+ BindingURLParser parser = new BindingURLParser(_url,this);
+ processOptions();
+ _logger.debug("URL Parsed: " + this);
}
- private void setExchangeClass(String exchangeClass)
+ public void setExchangeClass(String exchangeClass)
{
setExchangeClass(new AMQShortString(exchangeClass));
}
- private void setQueueName(String name) throws URLSyntaxException
+ public void setQueueName(String name)
{
setQueueName(new AMQShortString(name));
}
- private void setDestinationName(String name)
+ public void setDestinationName(String name)
{
setDestinationName(new AMQShortString(name));
}
- private void setExchangeName(String exchangeName)
+ public void setExchangeName(String exchangeName)
{
setExchangeName(new AMQShortString(exchangeName));
}
- private void processOptions()
+ private void processOptions() throws URISyntaxException
{
- // this is where we would parse any options that needed more than just storage.
}
public String getURL()
@@ -210,34 +130,9 @@
return _queueName;
}
- public void setQueueName(AMQShortString name) throws URLSyntaxException
+ public void setQueueName(AMQShortString name)
{
- if (_exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
- {
- if (Boolean.parseBoolean(getOption(OPTION_DURABLE)))
- {
- if (containsOption(BindingURL.OPTION_CLIENTID) && containsOption(BindingURL.OPTION_SUBSCRIPTION))
- {
- _queueName =
- new AMQShortString(getOption(BindingURL.OPTION_CLIENTID + ":" + BindingURL.OPTION_SUBSCRIPTION));
- }
- else
- {
- throw URLHelper.parseError(-1, "Durable subscription must have values for " + BindingURL.OPTION_CLIENTID
- + " and " + BindingURL.OPTION_SUBSCRIPTION + ".", _url);
-
- }
- }
- else
- {
- _queueName = null;
- }
- }
- else
- {
- _queueName = name;
- }
-
+ _queueName = name;
}
public String getOption(String key)
@@ -261,7 +156,7 @@
{
if (containsOption(BindingURL.OPTION_ROUTING_KEY))
{
- return new AMQShortString(getOption(OPTION_ROUTING_KEY));
+ return new AMQShortString((String)getOption(OPTION_ROUTING_KEY));
}
else
{
@@ -271,12 +166,29 @@
if (containsOption(BindingURL.OPTION_ROUTING_KEY))
{
- return new AMQShortString(getOption(OPTION_ROUTING_KEY));
+ return new AMQShortString((String)getOption(OPTION_ROUTING_KEY));
}
return getDestinationName();
}
+ public AMQShortString[] getBindingKeys()
+ {
+ if (_bindingKeys != null && _bindingKeys.length>0)
+ {
+ return _bindingKeys;
+ }
+ else
+ {
+ return new AMQShortString[]{getRoutingKey()};
+ }
+ }
+
+ public void setBindingKeys(AMQShortString[] keys)
+ {
+ _bindingKeys = keys;
+ }
+
public void setRoutingKey(AMQShortString key)
{
setOption(OPTION_ROUTING_KEY, key.toString());
@@ -296,6 +208,29 @@
sb.append(URLHelper.printOptions(_options));
- return sb.toString();
+ // temp hack
+ if (getRoutingKey() == null || getRoutingKey().toString().equals(""))
+ {
+
+ if (sb.toString().indexOf("?") == -1)
+ {
+ sb.append("?");
+ }
+ else
+ {
+ sb.append("&");
+ }
+
+ for (AMQShortString key :_bindingKeys)
+ {
+ sb.append(BindingURL.OPTION_BINDING_KEY).append("='").append(key.toString()).append("'&");
+ }
+
+ return sb.toString().substring(0,sb.toString().length()-1);
+ }
+ else
+ {
+ return sb.toString();
+ }
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java Thu Feb 28 08:11:52 2008
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -34,6 +34,7 @@
public static final String OPTION_CLIENTID = "clientid";
public static final String OPTION_SUBSCRIPTION = "subscription";
public static final String OPTION_ROUTING_KEY = "routingkey";
+ public static final String OPTION_BINDING_KEY = "bindingkey";
String getURL();
@@ -51,6 +52,8 @@
boolean containsOption(String key);
AMQShortString getRoutingKey();
+
+ AMQShortString[] getBindingKeys();
String toString();
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/module.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/module.xml?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/module.xml (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/module.xml Thu Feb 28 08:11:52 2008
@@ -158,6 +158,7 @@
</target>
<property name="test" value="*Test"/>
+ <property name="test.mem" value="512M"/>
<property name="log" value="info"/>
<property name="amqj.logging.level" value="${log}"/>
@@ -189,6 +190,8 @@
<target name="test" depends="compile-tests" if="module.test.src.exists"
description="execute unit tests">
<junit fork="yes" haltonfailure="no" printsummary="on" timeout="600000">
+
+ <jvmarg value="-Xmx${test.mem}" />
<sysproperty key="amqj.logging.level" value="${amqj.logging.level}"/>
<sysproperty key="root.logging.level" value="${root.logging.level}"/>
Modified: incubator/qpid/branches/thegreatmerge/qpid/python/mllib/dom.py
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/mllib/dom.py?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/python/mllib/dom.py (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/python/mllib/dom.py Thu Feb 28 08:11:52 2008
@@ -72,7 +72,7 @@
cls = cls.base
return False
- def dispatch(self, f):
+ def dispatch(self, f, attrs = ""):
cls = self
while cls != None:
if hasattr(f, cls.type):
@@ -81,7 +81,6 @@
cls = cls.base
cls = self
- attrs = ""
while cls != None:
if attrs:
sep = ", "
@@ -151,9 +150,10 @@
def dispatch(self, f):
try:
- method = getattr(f, "do_" + self.name)
+ attr = "do_" + self.name
+ method = getattr(f, attr)
except AttributeError:
- return Dispatcher.dispatch(self, f)
+ return Dispatcher.dispatch(self, f, "'%s'" % attr)
return method(self)
class Leaf(Component, Dispatcher):
Modified: incubator/qpid/branches/thegreatmerge/qpid/python/qpid/queue.py
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/qpid/queue.py?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/python/qpid/queue.py (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/python/qpid/queue.py Thu Feb 28 08:11:52 2008
@@ -24,36 +24,51 @@
"""
from Queue import Queue as BaseQueue, Empty, Full
+from threading import Thread
class Closed(Exception): pass
class Queue(BaseQueue):
END = object()
+ STOP = object()
def __init__(self, *args, **kwargs):
BaseQueue.__init__(self, *args, **kwargs)
- self._real_put = self.put
- self.listener = self._real_put
+ self.listener = None
+ self.thread = None
def close(self):
self.put(Queue.END)
def get(self, block = True, timeout = None):
- self.put = self._real_put
- try:
- result = BaseQueue.get(self, block, timeout)
- if result == Queue.END:
- # this guarantees that any other waiting threads or any future
- # calls to get will also result in a Closed exception
- self.put(Queue.END)
- raise Closed()
- else:
- return result
- finally:
- self.put = self.listener
- pass
+ result = BaseQueue.get(self, block, timeout)
+ if result == Queue.END:
+ # this guarantees that any other waiting threads or any future
+ # calls to get will also result in a Closed exception
+ self.put(Queue.END)
+ raise Closed()
+ else:
+ return result
def listen(self, listener):
self.listener = listener
- self.put = self.listener
+ if listener == None:
+ if self.thread != None:
+ self.put(Queue.STOP)
+ self.thread.join()
+ self.thread = None
+ else:
+ if self.thread == None:
+ self.thread = Thread(target = self.run)
+ self.thread.setDaemon(True)
+ self.thread.start()
+
+ def run(self):
+ while True:
+ try:
+ o = self.get()
+ if o == Queue.STOP: break
+ self.listener(o)
+ except Closed:
+ break
Modified: incubator/qpid/branches/thegreatmerge/qpid/python/tests/queue.py
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests/queue.py?rev=632040&r1=632039&r2=632040&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/python/tests/queue.py (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/python/tests/queue.py Thu Feb 28 08:11:52 2008
@@ -30,37 +30,32 @@
# all the queue functionality.
def test_listen(self):
- LISTEN = object()
- GET = object()
- EMPTY = object()
+ values = []
+ heard = threading.Event()
+ def listener(x):
+ values.append(x)
+ heard.set()
q = Queue(0)
- values = []
- q.listen(lambda x: values.append((LISTEN, x)))
+ q.listen(listener)
+ heard.clear()
q.put(1)
- assert values[-1] == (LISTEN, 1)
+ heard.wait()
+ assert values[-1] == 1
+ heard.clear()
q.put(2)
- assert values[-1] == (LISTEN, 2)
-
- class Getter(threading.Thread):
+ heard.wait()
+ assert values[-1] == 2
- def run(self):
- try:
- values.append((GET, q.get(timeout=10)))
- except Empty:
- values.append(EMPTY)
-
- g = Getter()
- g.start()
- # let the other thread reach the get
- time.sleep(2)
+ q.listen(None)
q.put(3)
- g.join()
-
- assert values[-1] == (GET, 3)
+ assert q.get(3) == 3
+ q.listen(listener)
+ heard.clear()
q.put(4)
- assert values[-1] == (LISTEN, 4)
+ heard.wait()
+ assert values[-1] == 4
def test_close(self):
q = Queue(0)