You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2008/02/27 06:15:21 UTC
svn commit: r631486 - in
/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid:
client/ client/message/ jndi/
Author: rajith
Date: Tue Feb 26 21:15:20 2008
New Revision: 631486
URL: http://svn.apache.org/viewvc?rev=631486&view=rev
Log:
This contains the ground work for QPID-803.
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=631486&r1=631485&r2=631486&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java Tue Feb 26 21:15:20 2008
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.client;
+import java.net.URISyntaxException;
+
import javax.jms.Destination;
import javax.naming.NamingException;
import javax.naming.Reference;
@@ -31,7 +33,6 @@
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.url.URLHelper;
-import org.apache.qpid.url.URLSyntaxException;
public abstract class AMQDestination implements Destination, Referenceable
@@ -50,6 +51,8 @@
private AMQShortString _routingKey;
+ private AMQShortString[] _bindingKeys;
+
private String _url;
private AMQShortString _urlAsShortString;
@@ -64,7 +67,7 @@
public static final Integer TOPIC_TYPE = Integer.valueOf(2);
public static final Integer UNKNOWN_TYPE = Integer.valueOf(3);
- protected AMQDestination(String url) throws URLSyntaxException
+ protected AMQDestination(String url) throws URISyntaxException
{
this(new AMQBindingURL(url));
}
@@ -79,26 +82,43 @@
_isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE));
_queueName = binding.getQueueName() == null ? null : new AMQShortString(binding.getQueueName());
_routingKey = binding.getRoutingKey() == null ? null : new AMQShortString(binding.getRoutingKey());
+ _bindingKeys = binding.getBindingKeys() == null || binding.getBindingKeys().length == 0 ? new AMQShortString[0] : binding.getBindingKeys();
}
protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, AMQShortString queueName)
{
- this(exchangeName, exchangeClass, routingKey, false, false, queueName);
+ this(exchangeName, exchangeClass, routingKey, false, false, queueName, null);
+ }
+
+ protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, AMQShortString queueName, AMQShortString[] bindingKeys)
+ {
+ this(exchangeName, exchangeClass, routingKey, false, false, queueName,bindingKeys);
}
protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName)
{
- this(exchangeName, exchangeClass, destinationName, false, false, null);
+ this(exchangeName, exchangeClass, destinationName, false, false, null,null);
}
protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
- boolean isAutoDelete, AMQShortString queueName)
+ boolean isAutoDelete, AMQShortString queueName)
{
- this(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, false);
+ this(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, false,null);
}
protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
- boolean isAutoDelete, AMQShortString queueName, boolean isDurable)
+ boolean isAutoDelete, AMQShortString queueName,AMQShortString[] bindingKeys)
+ {
+ this(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, false,bindingKeys);
+ }
+
+ protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
+ boolean isAutoDelete, AMQShortString queueName, boolean isDurable){
+ this (exchangeName, exchangeClass, routingKey, isExclusive,isAutoDelete,queueName,isDurable,null);
+ }
+
+ protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
+ boolean isAutoDelete, AMQShortString queueName, boolean isDurable,AMQShortString[] bindingKeys)
{
// If used with a fannout exchange, the routing key can be null
if ( !ExchangeDefaults.FANOUT_EXCHANGE_CLASS.equals(exchangeClass) && routingKey == null)
@@ -120,6 +140,7 @@
_isAutoDelete = isAutoDelete;
_queueName = queueName;
_isDurable = isDurable;
+ _bindingKeys = bindingKeys == null || bindingKeys.length == 0 ? new AMQShortString[0] : bindingKeys;
}
public AMQShortString getEncodedName()
@@ -181,6 +202,20 @@
return _routingKey;
}
+ public AMQShortString[] getBindingKeys()
+ {
+ if (_bindingKeys != null && _bindingKeys.length > 0)
+ {
+ return _bindingKeys;
+ }
+ else
+ {
+ // catering to the common use case where the
+ //routingKey is the same as the bindingKey.
+ return new AMQShortString[]{_routingKey};
+ }
+ }
+
public boolean isExclusive()
{
return _isExclusive;
@@ -236,6 +271,21 @@
sb.append(BindingURL.OPTION_ROUTING_KEY);
sb.append("='");
sb.append(_routingKey).append("'");
+ sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ }
+
+ if (_bindingKeys != null && _bindingKeys.length>0)
+ {
+ sb.append(BindingURL.OPTION_BINDING_KEY);
+ sb.append("='");
+ for (AMQShortString bindingKey:_bindingKeys)
+ {
+
+ sb.append(bindingKey).append(",");
+
+ }
+ sb.deleteCharAt(sb.length() - 1);
+ sb.append("'");
sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java?rev=631486&r1=631485&r2=631486&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java Tue Feb 26 21:15:20 2008
@@ -61,8 +61,14 @@
public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName)
{
super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, false,
- false, queueName, false); }
+ false, queueName, false);
+ }
+ public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName,AMQShortString[] bindingKeys)
+ {
+ super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, false,
+ false, queueName, false,bindingKeys);
+ }
/**
* Create a reference to a non temporary queue. Note this does not actually imply the queue exists.
@@ -126,11 +132,15 @@
this(exchangeName, routingKey, queueName, exclusive, autoDelete, false);
}
-
public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable)
{
+ this(exchangeName,routingKey,queueName,exclusive,autoDelete,durable,null);
+ }
+
+ public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable,AMQShortString[] bindingKeys)
+ {
super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, exclusive,
- autoDelete, queueName, durable);
+ autoDelete, queueName, durable, bindingKeys);
}
public AMQShortString getRoutingKey()
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=631486&r1=631485&r2=631486&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Feb 26 21:15:20 2008
@@ -22,6 +22,7 @@
import java.io.Serializable;
+import java.net.URISyntaxException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Iterator;
@@ -29,6 +30,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import javax.jms.BytesMessage;
import javax.jms.Destination;
@@ -80,12 +82,9 @@
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.url.AMQBindingURL;
-import org.apache.qpid.url.URLSyntaxException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.atomic.AtomicLong;
-
/**
*
* <p/><table id="crc"><caption>CRC Card</caption>
@@ -407,14 +406,14 @@
* @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges?
*/
public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
- final AMQShortString exchangeName) throws AMQException
+ final AMQShortString exchangeName,final AMQDestination destination) throws AMQException
{
/*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/
new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
{
public Object execute() throws AMQException, FailoverException
{
- sendQueueBind(queueName,routingKey,arguments,exchangeName);
+ sendQueueBind(queueName,routingKey,arguments,exchangeName,destination);
return null;
}
}, _connection).execute();
@@ -425,12 +424,12 @@
{
if( consumer.getQueuename() != null)
{
- bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName());
+ bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName(),amqd);
}
}
public abstract void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
- final AMQShortString exchangeName) throws AMQException, FailoverException;
+ final AMQShortString exchangeName,AMQDestination destination) throws AMQException, FailoverException;
/**
@@ -820,7 +819,7 @@
{
return new AMQQueue(new AMQBindingURL(queueName));
}
- catch (URLSyntaxException urlse)
+ catch (URISyntaxException urlse)
{
JMSException jmse = new JMSException(urlse.getReason());
jmse.setLinkedException(urlse);
@@ -1031,7 +1030,7 @@
{
return new AMQTopic(new AMQBindingURL(topicName));
}
- catch (URLSyntaxException urlse)
+ catch (URISyntaxException urlse)
{
JMSException jmse = new JMSException(urlse.getReason());
jmse.setLinkedException(urlse);
@@ -1165,7 +1164,7 @@
AMQProtocolHandler protocolHandler = getProtocolHandler();
declareExchange(amqd, protocolHandler, false);
AMQShortString queueName = declareQueue(amqd, protocolHandler);
- bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName());
+ bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(),amqd);
}
/**
@@ -1556,6 +1555,9 @@
public abstract boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
throws JMSException;
+
+ public abstract boolean isQueueBound(final AMQDestination destination) throws JMSException;
+
/**
* Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover
* when the client has veoted resubscription. <p/> The caller of this method must already hold the failover mutex.
@@ -2111,7 +2113,7 @@
consumer.setQueuename(queueName);
// bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
- bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName());
+ bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName(),amqd);
// If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch
if (!_immediatePrefetch)
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=631486&r1=631485&r2=631486&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Tue Feb 26 21:15:20 2008
@@ -41,6 +41,7 @@
import javax.jms.*;
import javax.jms.IllegalStateException;
+
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.UUID;
import java.util.Map;
@@ -159,7 +160,7 @@
AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
-
+
_subscriptions.put(name, subscriber);
_reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
@@ -213,7 +214,7 @@
* @param arguments 0_8 specific
*/
public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey,
- final FieldTable arguments, final AMQShortString exchangeName)
+ final FieldTable arguments, final AMQShortString exchangeName, final AMQDestination destination)
throws AMQException, FailoverException
{
Map args = FiledTableSupport.convertToMap(arguments);
@@ -222,7 +223,12 @@
{
args.put("x-match", "any");
}
- getQpidSession().queueBind(queueName.toString(), exchangeName.toString(), routingKey.toString(), args);
+
+ for (AMQShortString rk: destination.getBindingKeys())
+ {
+ _logger.debug("Binding queue : " + queueName.toString() + " exchange: " + exchangeName.toString() + " using binding key " + rk.asString());
+ getQpidSession().queueBind(queueName.toString(), exchangeName.toString(), rk.toString(), args);
+ }
// We need to sync so that we get notify of an error.
getQpidSession().sync();
getCurrentException();
@@ -238,6 +244,7 @@
*/
public void sendClose(long timeout) throws AMQException, FailoverException
{
+ getQpidSession().sync();
getQpidSession().sessionClose();
getCurrentException();
}
@@ -350,19 +357,37 @@
/**
* Bind a queue with an exchange.
*/
- public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName,
- final AMQShortString routingKey) throws JMSException
+
+ public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
+ throws JMSException
+ {
+ return isQueueBound(exchangeName,queueName,routingKey,null);
+ }
+
+ public boolean isQueueBound(final AMQDestination destination) throws JMSException
+ {
+ return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getRoutingKey(),destination.getBindingKeys());
+ }
+
+ public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey,AMQShortString[] bindingKeys)
+ throws JMSException
{
String rk = "";
boolean res;
- if (routingKey != null)
+ if (bindingKeys != null && bindingKeys.length>0)
+ {
+ rk = bindingKeys[0].toString();
+ }
+ else if (routingKey != null)
{
rk = routingKey.toString();
}
+
Future<BindingQueryResult> result =
- getQpidSession().bindingQuery(exchangeName.toString(), queueName.toString(), rk, null);
+ getQpidSession().bindingQuery(exchangeName.toString(),queueName.toString(), rk, null);
BindingQueryResult bindingQueryResult = result.get();
- if (routingKey == null)
+
+ if (rk == null)
{
res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getQueueNotFound());
}
@@ -577,7 +602,7 @@
{
// this is done so that we can produce to a temporary queue beofre we create a consumer
sendCreateQueue(result.getRoutingKey(), result.isAutoDelete(), result.isDurable(), result.isExclusive());
- sendQueueBind(result.getRoutingKey(), result.getRoutingKey(), new FieldTable(), result.getExchangeName());
+ sendQueueBind(result.getRoutingKey(), result.getRoutingKey(), new FieldTable(), result.getExchangeName(),result);
result.setQueueName(result.getRoutingKey());
}
catch (Exception e)
@@ -701,7 +726,7 @@
AMQShortString topicName;
if (topic instanceof AMQTopic)
{
- topicName=((AMQTopic) topic).getRoutingKey();
+ topicName=((AMQTopic) topic).getBindingKeys()[0];
}
else
{
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=631486&r1=631485&r2=631486&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Tue Feb 26 21:15:20 2008
@@ -94,7 +94,7 @@
}
public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
- final AMQShortString exchangeName) throws AMQException, FailoverException
+ final AMQShortString exchangeName, final AMQDestination dest) throws AMQException, FailoverException
{
AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), arguments, // arguments
exchangeName, // exchange
@@ -176,6 +176,11 @@
_connection.getProtocolHandler().writeFrame(basicRejectBody);
}
+ }
+
+ public boolean isQueueBound(final AMQDestination destination) throws JMSException
+ {
+ return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getAMQQueueName());
}
public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java?rev=631486&r1=631485&r2=631486&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java Tue Feb 26 21:15:20 2008
@@ -49,6 +49,11 @@
super(exchange, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, routingKey, true, true, queueName, false);
}
+ public AMQTopic(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName,AMQShortString[] bindingKeys)
+ {
+ super(exchange, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, routingKey, true, true, queueName, false,bindingKeys);
+ }
+
public AMQTopic(AMQConnection conn, String routingKey)
{
this(conn.getDefaultTopicExchangeName(), new AMQShortString(routingKey));
@@ -77,6 +82,11 @@
super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable );
}
+ protected AMQTopic(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
+ boolean isAutoDelete, AMQShortString queueName, boolean isDurable,AMQShortString[] bindingKeys)
+ {
+ super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable,bindingKeys);
+ }
public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection)
throws JMSException
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=631486&r1=631485&r2=631486&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Tue Feb 26 21:15:20 2008
@@ -121,8 +121,10 @@
}
catch (AMQException e)
{
+ _logger.error("Receivecd an Exception when receiving message",e);
try
{
+
getSession().getAMQConnection().getExceptionListener()
.onException(new JMSAMQException("Error when receiving message", e));
}
@@ -134,6 +136,7 @@
}
if (messageOk)
{
+ _logger.debug("messageOk, trying to notify");
super.notifyMessage(jmsMessage, channelId);
}
}
@@ -331,6 +334,9 @@
_logger.debug("filterMessage - trying to acquire message");
}
messageOk = acquireMessage(message);
+ _logger.debug("filterMessage - *************************************");
+ _logger.debug("filterMessage - message acquire status : " + messageOk);
+ _logger.debug("filterMessage - *************************************");
}
return messageOk;
}
@@ -392,13 +398,29 @@
_0_10session.getQpidSession()
.messageAcquire(ranges, org.apache.qpidity.nclient.Session.MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE);
+
+ _logger.debug("acquireMessage, sent acquire message to broker");
+
_0_10session.getQpidSession().sync();
+
+ _logger.debug("acquireMessage, returned from sync");
+
RangeSet acquired = _0_10session.getQpidSession().getAccquiredMessages();
+
+ _logger.debug("acquireMessage, acquired range set " + acquired);
+
if (acquired != null && acquired.size() > 0)
{
result = true;
}
+
+ _logger.debug("acquireMessage, Trying to get current exception ");
+
_0_10session.getCurrentException();
+
+ _logger.debug("acquireMessage, returned from getting current exception ");
+
+ _logger.debug("acquireMessage, acquired range set " + acquired + " now returning " );
}
return result;
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=631486&r1=631485&r2=631486&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Tue Feb 26 21:15:20 2008
@@ -17,22 +17,22 @@
*/
package org.apache.qpid.client;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.FiledTableSupport;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.url.AMQBindingURL;
-import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpidity.njms.ExceptionHelper;
import org.apache.qpidity.nclient.util.ByteBufferMessage;
-import org.apache.qpidity.transport.ReplyTo;
+import org.apache.qpidity.njms.ExceptionHelper;
import org.apache.qpidity.transport.DeliveryProperties;
-
-import javax.jms.Message;
-import javax.jms.JMSException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
+import org.apache.qpidity.transport.ReplyTo;
/**
* This is a 0_10 message producer.
@@ -154,12 +154,20 @@
String replyToURL = contentHeaderProperties.getReplyToAsString();
if (replyToURL != null)
{
+ if(_logger.isDebugEnabled())
+ {
+ StringBuffer b = new StringBuffer();
+ b.append("\n==========================");
+ b.append("\nReplyTo : " + replyToURL);
+ b.append("\n==========================");
+ _logger.debug(b.toString());
+ }
AMQBindingURL dest;
try
{
dest = new AMQBindingURL(replyToURL);
}
- catch (URLSyntaxException e)
+ catch (URISyntaxException e)
{
throw ExceptionHelper.convertQpidExceptionToJMSException(e);
}
@@ -198,8 +206,7 @@
public boolean isBound(AMQDestination destination) throws JMSException
{
- return _session.isQueueBound(destination.getExchangeName(), destination.getAMQQueueName(),
- destination.getRoutingKey());
+ return _session.isQueueBound(destination);
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?rev=631486&r1=631485&r2=631486&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Tue Feb 26 21:15:20 2008
@@ -20,29 +20,34 @@
*/
package org.apache.qpid.client.message;
-import org.apache.commons.collections.map.ReferenceMap;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.Map;
+import java.util.UUID;
-import org.apache.mina.common.ByteBuffer;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageNotReadableException;
+import javax.jms.MessageNotWriteableException;
+import org.apache.commons.collections.map.ReferenceMap;
+import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.*;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.AMQUndefinedDestination;
+import org.apache.qpid.client.BasicMessageConsumer;
+import org.apache.qpid.client.CustomJMSXProperty;
+import org.apache.qpid.client.JMSAMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
-import org.apache.qpid.url.URLSyntaxException;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageNotReadableException;
-import javax.jms.MessageNotWriteableException;
-
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.Map;
-import java.util.UUID;
-import java.io.IOException;
public abstract class AbstractJMSMessage extends AMQMessage implements org.apache.qpid.jms.Message
{
@@ -222,7 +227,7 @@
BindingURL binding = new AMQBindingURL(replyToEncoding);
dest = AMQDestination.createDestination(binding);
}
- catch (URLSyntaxException e)
+ catch (URISyntaxException e)
{
throw new JMSAMQException("Illegal value in JMS_ReplyTo property: " + replyToEncoding, e);
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java?rev=631486&r1=631485&r2=631486&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java Tue Feb 26 21:15:20 2008
@@ -20,6 +20,24 @@
*/
package org.apache.qpid.jndi;
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Queue;
+import javax.jms.Topic;
+import javax.naming.Context;
+import javax.naming.NamingException;
+import javax.naming.spi.InitialContextFactory;
+
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQHeadersExchange;
@@ -30,27 +48,9 @@
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.url.URLSyntaxException;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.Queue;
-import javax.jms.Topic;
-import javax.naming.Context;
-import javax.naming.NamingException;
-import javax.naming.spi.InitialContextFactory;
-
-import java.io.BufferedInputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.Hashtable;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-
public class PropertiesFileInitialContextFactory implements InitialContextFactory
{
protected final Logger _logger = LoggerFactory.getLogger(PropertiesFileInitialContextFactory.class);
@@ -99,7 +99,7 @@
_logger.warn("Unable to load property file specified in Provider_URL:" + environment.get(Context.PROVIDER_URL));
}
-
+
createConnectionFactories(data, environment);
@@ -185,6 +185,17 @@
Topic t = createTopic(entry.getValue().toString());
if (t != null)
{
+ if (_logger.isDebugEnabled())
+ {
+ StringBuffer b = new StringBuffer();
+ b.append("Creating the topic: " + jndiName + " with the following binding keys ");
+ for (AMQShortString binding:((AMQTopic)t).getBindingKeys())
+ {
+ b.append(binding.toString()).append(",");
+ }
+
+ _logger.debug(b.toString());
+ }
data.put(jndiName, t);
}
}
@@ -218,9 +229,9 @@
{
binding = new AMQBindingURL(bindingURL);
}
- catch (URLSyntaxException urlse)
+ catch (URISyntaxException urlse)
{
- _logger.warn("Unable to destination:" + urlse);
+ _logger.warn("Unable to create destination:" + urlse, urlse);
return null;
}
@@ -269,7 +280,17 @@
}
else if (value instanceof String)
{
- return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, new AMQShortString((String) value));
+ String[] keys = ((String)value).split(",");
+ AMQShortString[] bindings = new AMQShortString[keys.length];
+ int i = 0;
+ for (String key:keys)
+ {
+ bindings[i] = new AMQShortString(key);
+ i++;
+ }
+ // The Destination has a dual nature. If this was used for a producer the key is used
+ // for the routing key. If it was used for the consumer it becomes the bindingKey
+ return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME,bindings[0],null,bindings);
}
else if (value instanceof BindingURL)
{