You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/08/28 16:31:52 UTC
svn commit: r1621143 - in
/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid: client/
client/message/ client/messaging/address/ jndi/
Author: rgodfrey
Date: Thu Aug 28 14:31:52 2014
New Revision: 1621143
URL: http://svn.apache.org/r1621143
Log:
QPID-6052 : Use ADDR addresses for JMSDestination on incoming messages in 0-9-1 when the address mode is ADDR
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java?rev=1621143&r1=1621142&r2=1621143&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java Thu Aug 28 14:31:52 2014
@@ -58,7 +58,7 @@ public class AMQAnyDestination extends A
super(str);
}
- public AMQAnyDestination(Address addr) throws Exception
+ public AMQAnyDestination(Address addr)
{
super(addr);
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=1621143&r1=1621142&r2=1621143&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java Thu Aug 28 14:31:52 2014
@@ -211,7 +211,7 @@ public abstract class AMQDestination imp
{
}
- protected AMQDestination(Address address) throws Exception
+ protected AMQDestination(Address address)
{
this._address = address;
getInfoFromAddress();
@@ -749,7 +749,8 @@ public abstract class AMQDestination imp
}
}
- public static Destination createDestination(String str) throws Exception
+ public static Destination createDestination(String str, final boolean useNodeTypeForDestinationType)
+ throws URISyntaxException
{
DestSyntax syntax = getDestType(str);
str = stripSyntaxPrefix(str);
@@ -760,7 +761,24 @@ public abstract class AMQDestination imp
else
{
Address address = createAddressFromString(str);
- return new AMQAnyDestination(address);
+ if(useNodeTypeForDestinationType)
+ {
+ AddressHelper helper = new AddressHelper(address);
+ switch(helper.getNodeType())
+ {
+ case AMQDestination.QUEUE_TYPE:
+ return new AMQQueue(address);
+ case AMQDestination.TOPIC_TYPE:
+ return new AMQTopic(address);
+ default:
+ return new AMQAnyDestination(address);
+ }
+
+ }
+ else
+ {
+ return new AMQAnyDestination(address);
+ }
}
}
@@ -912,7 +930,7 @@ public abstract class AMQDestination imp
return Address.parse(str);
}
- private void getInfoFromAddress() throws Exception
+ private void getInfoFromAddress()
{
_name = _address.getName();
_subject = _address.getSubject();
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java?rev=1621143&r1=1621142&r2=1621143&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java Thu Aug 28 14:31:52 2014
@@ -20,13 +20,15 @@
*/
package org.apache.qpid.client;
+import java.net.URISyntaxException;
+
+import javax.jms.Queue;
+
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.messaging.Address;
import org.apache.qpid.url.BindingURL;
-import javax.jms.Queue;
-import java.net.URISyntaxException;
-
public class AMQQueue extends AMQDestination implements Queue
{
private static final long serialVersionUID = -1283142598932655606L;
@@ -36,6 +38,11 @@ public class AMQQueue extends AMQDestina
super();
}
+ public AMQQueue(Address address)
+ {
+ super(address);
+ }
+
public AMQQueue(String address) throws URISyntaxException
{
super(address);
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1621143&r1=1621142&r2=1621143&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Aug 28 14:31:52 2014
@@ -1424,7 +1424,7 @@ public abstract class AMQSession<C exten
checkValidDestination(destination);
Queue dest = validateQueue(destination);
C consumer = (C) createConsumer(dest);
-
+ consumer.setAddressType(AMQDestination.QUEUE_TYPE);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -1443,7 +1443,7 @@ public abstract class AMQSession<C exten
checkValidDestination(destination);
Queue dest = validateQueue(destination);
C consumer = (C) createConsumer(dest, messageSelector);
-
+ consumer.setAddressType(AMQDestination.QUEUE_TYPE);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -1461,7 +1461,7 @@ public abstract class AMQSession<C exten
checkNotClosed();
Queue dest = validateQueue(queue);
C consumer = (C) createConsumer(dest);
-
+ consumer.setAddressType(AMQDestination.QUEUE_TYPE);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -1480,7 +1480,7 @@ public abstract class AMQSession<C exten
checkNotClosed();
Queue dest = validateQueue(queue);
C consumer = (C) createConsumer(dest, messageSelector);
-
+ consumer.setAddressType(AMQDestination.QUEUE_TYPE);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -2590,7 +2590,7 @@ public abstract class AMQSession<C exten
("Cannot create a durable subscription with a temporary topic: " + topic);
}
- if (!(topic instanceof AMQDestination && topic instanceof javax.jms.Topic))
+ if (!(topic instanceof AMQDestination))
{
throw new javax.jms.InvalidDestinationException(
"Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: "
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1621143&r1=1621142&r2=1621143&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Thu Aug 28 14:31:52 2014
@@ -774,7 +774,8 @@ public class AMQSession_0_8 extends AMQS
// Bounced message is processed here, away from the mina thread
AbstractJMSMessage bouncedMessage =
getMessageFactoryRegistry().createMessage(0, false, msg.getExchange(),
- msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies(), _queueDestinationCache, _topicDestinationCache);
+ msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies(), _queueDestinationCache,
+ _topicDestinationCache, AMQDestination.UNKNOWN_TYPE);
AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
AMQShortString reason = msg.getReplyText();
_logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java?rev=1621143&r1=1621142&r2=1621143&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java Thu Aug 28 14:31:52 2014
@@ -20,16 +20,16 @@
*/
package org.apache.qpid.client;
-import org.apache.qpid.client.AMQDestination.DestSyntax;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.messaging.Address;
-import org.apache.qpid.url.BindingURL;
+import java.net.URISyntaxException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.Topic;
-import java.net.URISyntaxException;
+
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.messaging.Address;
+import org.apache.qpid.url.BindingURL;
public class AMQTopic extends AMQDestination implements Topic
{
@@ -40,7 +40,7 @@ public class AMQTopic extends AMQDestina
super(address);
}
- public AMQTopic(Address address) throws Exception
+ public AMQTopic(Address address)
{
super(address);
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1621143&r1=1621142&r2=1621143&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Thu Aug 28 14:31:52 2014
@@ -136,6 +136,7 @@ public abstract class BasicMessageConsum
private List<StackTraceElement> _closedStack = null;
private boolean _isDurableSubscriber = false;
+ private int _addressType = AMQDestination.UNKNOWN_TYPE;
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
@@ -203,7 +204,7 @@ public abstract class BasicMessageConsum
}
_arguments = ft;
-
+ _addressType = _destination.getAddressType();
}
public AMQDestination getDestination()
@@ -1066,4 +1067,14 @@ public abstract class BasicMessageConsum
{
_isDurableSubscriber = true;
}
+
+ void setAddressType(final int addressType)
+ {
+ _addressType = addressType;
+ }
+
+ int getAddressType()
+ {
+ return _addressType;
+ }
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=1621143&r1=1621142&r2=1621143&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Thu Aug 28 14:31:52 2014
@@ -151,7 +151,7 @@ public class BasicMessageConsumer_0_8 ex
return getMessageFactory().createMessage(messageFrame.getDeliveryTag(),
messageFrame.isRedelivered(), messageFrame.getExchange() == null ? AMQShortString.EMPTY_STRING : messageFrame.getExchange(),
messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies(),
- _queueDestinationCache, _topicDestinationCache);
+ _queueDestinationCache, _topicDestinationCache, getAddressType());
}
@@ -164,4 +164,6 @@ public class BasicMessageConsumer_0_8 ex
{
return _rejectBehaviour;
}
+
+
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java?rev=1621143&r1=1621142&r2=1621143&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java Thu Aug 28 14:31:52 2014
@@ -20,14 +20,14 @@
*/
package org.apache.qpid.client;
-import org.apache.qpid.AMQException;
-
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
+import org.apache.qpid.AMQException;
+
/**
* Wraps a MessageConsumer to fulfill the extended TopicSubscriber contract
*
@@ -43,6 +43,7 @@ class TopicSubscriberAdaptor<C extends B
_topic = topic;
_consumer = consumer;
_noLocal = noLocal;
+ consumer.setAddressType(AMQDestination.TOPIC_TYPE);
}
TopicSubscriberAdaptor(Topic topic, C consumer)
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java?rev=1621143&r1=1621142&r2=1621143&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java Thu Aug 28 14:31:52 2014
@@ -133,7 +133,7 @@ public class AMQMessageDelegate_0_10 ext
}
}
dest = (AMQDestination) convertToAddressBasedDestination(_deliveryProps.getExchange(),
- _deliveryProps.getRoutingKey(), subject);
+ _deliveryProps.getRoutingKey(), subject, false, AMQDestination.UNKNOWN_TYPE);
}
setJMSDestination(dest);
@@ -280,7 +280,8 @@ public class AMQMessageDelegate_0_10 ext
}
else
{
- dest = convertToAddressBasedDestination(exchange,routingKey,null);
+ dest = convertToAddressBasedDestination(exchange,routingKey,null, false,
+ AMQDestination.UNKNOWN_TYPE);
}
_destinationCache.put(replyTo, dest);
}
@@ -288,49 +289,6 @@ public class AMQMessageDelegate_0_10 ext
return dest;
}
}
-
- private Destination convertToAddressBasedDestination(String exchange, String routingKey, String subject)
- {
- String addr;
- boolean isQueue = true;
- if ("".equals(exchange)) // type Queue
- {
- subject = (subject == null) ? "" : "/" + subject;
- addr = routingKey + subject;
- }
- else
- {
- addr = exchange + "/" + routingKey;
- isQueue = false;
- }
-
- try
- {
- AMQDestination dest = (AMQDestination)AMQDestination.createDestination("ADDR:" + addr);
- if (isQueue)
- {
- dest.setQueueName(new AMQShortString(routingKey));
- dest.setRoutingKey(new AMQShortString(routingKey));
- dest.setExchangeName(new AMQShortString(""));
- }
- else
- {
- dest.setRoutingKey(new AMQShortString(routingKey));
- dest.setExchangeName(new AMQShortString(exchange));
- }
- return dest;
- }
- catch(Exception e)
- {
- // An exception is only thrown here if the address syntax is invalid.
- // Logging the exception, but not throwing as this is only important to Qpid developers.
- // An exception here means a bug in the code.
- _logger.error("Exception when constructing an address string from the ReplyTo struct");
-
- // falling back to the old way of doing it to ensure the application continues.
- return generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey));
- }
- }
public void setJMSReplyTo(Destination destination) throws JMSException
{
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java?rev=1621143&r1=1621142&r2=1621143&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java Thu Aug 28 14:31:52 2014
@@ -21,6 +21,18 @@
package org.apache.qpid.client.message;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageNotWriteableException;
+import javax.jms.Queue;
+
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
@@ -32,17 +44,6 @@ import org.apache.qpid.framing.BasicCont
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageNotWriteableException;
-import javax.jms.Queue;
-import java.net.URISyntaxException;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.UUID;
-
public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
@@ -63,6 +64,7 @@ public class AMQMessageDelegate_0_8 exte
});
public static final String JMS_TYPE = "x-jms-type";
+ public static final boolean STRICT_JMS = Boolean.getBoolean("strict-jms");
private boolean _readableProperties = false;
@@ -96,7 +98,8 @@ public class AMQMessageDelegate_0_8 exte
// Used when generating a received message object
protected AMQMessageDelegate_0_8(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
AMQShortString routingKey, AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache,
- AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache)
+ AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache,
+ int addressType)
{
this(contentHeader, deliveryTag);
@@ -104,28 +107,53 @@ public class AMQMessageDelegate_0_8 exte
AMQDestination dest = null;
- // If we have a type set the attempt to use that.
- if (type != null)
+ if(AMQDestination.getDefaultDestSyntax() == AMQDestination.DestSyntax.BURL)
{
- switch (type.intValue())
+ // If we have a type set the attempt to use that.
+ if (type != null)
{
- case AMQDestination.QUEUE_TYPE:
- dest = queueDestinationCache.getDestination(exchange, routingKey);
- break;
- case AMQDestination.TOPIC_TYPE:
- dest = topicDestinationCache.getDestination(exchange, routingKey);
- break;
- default:
- // Use the generateDestination method
- dest = null;
+ switch (type.intValue())
+ {
+ case AMQDestination.QUEUE_TYPE:
+ dest = queueDestinationCache.getDestination(exchange, routingKey);
+ break;
+ case AMQDestination.TOPIC_TYPE:
+ dest = topicDestinationCache.getDestination(exchange, routingKey);
+ break;
+ default:
+ // Use the generateDestination method
+ dest = null;
+ }
}
- }
- if (dest == null)
+ if (dest == null)
+ {
+ dest = generateDestination(exchange, routingKey);
+ }
+ }
+ else
{
- dest = generateDestination(exchange, routingKey);
+ String subject = null;
+ if (contentHeader.getHeaders() != null
+ && contentHeader.getHeaders().containsKey(QpidMessageProperties.QPID_SUBJECT)
+ && STRICT_JMS)
+ {
+ subject = contentHeader.getHeaders().getString(QpidMessageProperties.QPID_SUBJECT);
+ if (subject != null)
+ {
+ contentHeader.getHeaders().remove(QpidMessageProperties.QPID_SUBJECT);
+ contentHeader.getHeaders().setString("JMS_" + QpidMessageProperties.QPID_SUBJECT_JMS_PROPER,
+ subject);
+ }
+ }
+ if(type == null)
+ {
+ type = addressType;
+ }
+ dest = (AMQDestination) convertToAddressBasedDestination(AMQShortString.toString(exchange),
+ AMQShortString.toString(routingKey), subject,
+ true, type);
}
-
setJMSDestination(dest);
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java?rev=1621143&r1=1621142&r2=1621143&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java Thu Aug 28 14:31:52 2014
@@ -20,6 +20,16 @@
*/
package org.apache.qpid.client.message;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
@@ -28,11 +38,6 @@ import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import javax.jms.JMSException;
-import javax.jms.Session;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
/**
* This abstract class provides exchange lookup functionality that is shared
* between all MessageDelegates. Update facilities are provided so that the 0-10
@@ -45,6 +50,7 @@ import java.util.concurrent.ConcurrentHa
*/
public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate
{
+ private static final Logger _logger = LoggerFactory.getLogger(AMQMessageDelegate.class);
private static Map<String, Integer> _exchangeTypeToDestinationType = new ConcurrentHashMap<String, Integer>();
private static Map<String,ExchangeInfo> _exchangeMap = new ConcurrentHashMap<String, ExchangeInfo>();
@@ -222,6 +228,76 @@ public abstract class AbstractAMQMessage
{
return _session;
}
+
+ protected Destination convertToAddressBasedDestination(String exchange,
+ String routingKey,
+ String subject,
+ boolean useNodeTypeForDestinationType,
+ int type)
+ {
+ String addr;
+ boolean isQueue = true;
+ if ("".equals(exchange)) // type Queue
+ {
+ subject = (subject == null) ? "" : "/" + subject;
+ addr = routingKey + subject;
+
+ }
+ else
+ {
+ addr = exchange + "/" + routingKey;
+ isQueue = false;
+ }
+
+ if(useNodeTypeForDestinationType)
+ {
+ if(type == AMQDestination.UNKNOWN_TYPE && "".equals(exchange))
+ {
+ type = AMQDestination.QUEUE_TYPE;
+ }
+
+ switch(type)
+ {
+ case AMQDestination.QUEUE_TYPE:
+ addr = addr + " ; { node: { type: queue } } ";
+ break;
+ case AMQDestination.TOPIC_TYPE:
+ addr = addr + " ; { node: { type: topic } } ";
+ break;
+ default:
+ // do nothing
+ }
+ }
+
+
+ try
+ {
+ AMQDestination dest = (AMQDestination)AMQDestination.createDestination("ADDR:" + addr,
+ useNodeTypeForDestinationType);
+ if (isQueue)
+ {
+ dest.setQueueName(new AMQShortString(routingKey));
+ dest.setRoutingKey(new AMQShortString(routingKey));
+ dest.setExchangeName(new AMQShortString(""));
+ }
+ else
+ {
+ dest.setRoutingKey(new AMQShortString(routingKey));
+ dest.setExchangeName(new AMQShortString(exchange));
+ }
+ return dest;
+ }
+ catch(Exception e)
+ {
+ // An exception is only thrown here if the address syntax is invalid.
+ // Logging the exception, but not throwing as this is only important to Qpid developers.
+ // An exception here means a bug in the code.
+ _logger.error("Exception when constructing an address string from the ReplyTo struct");
+
+ // falling back to the old way of doing it to ensure the application continues.
+ return generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey));
+ }
+ }
}
class ExchangeInfo
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=1621143&r1=1621142&r2=1621143&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Thu Aug 28 14:31:52 2014
@@ -47,11 +47,14 @@ public abstract class AbstractJMSMessage
{
private static final Logger _logger = LoggerFactory.getLogger(AbstractJMSMessageFactory.class);
- protected AbstractJMSMessage create08MessageWithBody(long messageNbr, ContentHeaderBody contentHeader,
- AMQShortString exchange, AMQShortString routingKey,
+ protected AbstractJMSMessage create08MessageWithBody(long messageNbr,
+ ContentHeaderBody contentHeader,
+ AMQShortString exchange,
+ AMQShortString routingKey,
List bodies,
AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache,
- AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache) throws AMQException
+ AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache,
+ final int addressType) throws AMQException
{
ByteBuffer data;
final boolean debug = _logger.isDebugEnabled();
@@ -117,7 +120,8 @@ public abstract class AbstractJMSMessage
AMQMessageDelegate delegate = new AMQMessageDelegate_0_8(messageNbr,
contentHeader.getProperties(),
- exchange, routingKey, queueDestinationCache, topicDestinationCache);
+ exchange, routingKey, queueDestinationCache,
+ topicDestinationCache, addressType);
return createMessage(delegate, data);
}
@@ -162,13 +166,15 @@ public abstract class AbstractJMSMessage
return message;
}
+ @Override
public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, ContentHeaderBody contentHeader,
AMQShortString exchange, AMQShortString routingKey, List bodies,
AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache,
- AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache)
+ AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache,
+ int addressType)
throws JMSException, AMQException
{
- final AbstractJMSMessage msg = create08MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies, queueDestinationCache, topicDestinationCache);
+ final AbstractJMSMessage msg = create08MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies, queueDestinationCache, topicDestinationCache, addressType);
msg.setJMSRedelivered(redelivered);
msg.setReceivedFromServer();
return msg;
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java?rev=1621143&r1=1621142&r2=1621143&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java Thu Aug 28 14:31:52 2014
@@ -20,6 +20,10 @@
*/
package org.apache.qpid.client.message;
+import java.util.List;
+
+import javax.jms.JMSException;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession_0_8;
@@ -29,16 +33,18 @@ import org.apache.qpid.framing.ContentHe
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.MessageProperties;
-import javax.jms.JMSException;
-import java.util.List;
-
public interface MessageFactory
{
- AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
+ AbstractJMSMessage createMessage(long deliveryTag,
+ boolean redelivered,
ContentHeaderBody contentHeader,
- AMQShortString exchange, AMQShortString routingKey,
- List bodies, AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache, AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache)
+ AMQShortString exchange,
+ AMQShortString routingKey,
+ List bodies,
+ AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache,
+ AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache,
+ final int addressType)
throws JMSException, AMQException;
AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java?rev=1621143&r1=1621142&r2=1621143&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java Thu Aug 28 14:31:52 2014
@@ -20,6 +20,12 @@
*/
package org.apache.qpid.client.message;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.JMSException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,11 +40,6 @@ import org.apache.qpid.transport.Deliver
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.MessageTransfer;
-import javax.jms.JMSException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
public class MessageFactoryRegistry
{
/**
@@ -95,19 +96,24 @@ public class MessageFactoryRegistry
* Create a message. This looks up the MIME type from the content header and instantiates the appropriate
* concrete message type.
*
- *
- * @param deliveryTag the AMQ message id
+ * @param deliveryTag the AMQ message id
* @param redelivered true if redelivered
* @param contentHeader the content header that was received
* @param bodies a list of ContentBody instances @return the message.
* @param queueDestinationCache
- *@param topicDestinationCache @throws AMQException
+ * @param topicDestinationCache @throws AMQException
+ * @param addressType
* @throws JMSException
*/
- public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, AMQShortString exchange,
- AMQShortString routingKey, ContentHeaderBody contentHeader, List bodies,
+ public AbstractJMSMessage createMessage(long deliveryTag,
+ boolean redelivered,
+ AMQShortString exchange,
+ AMQShortString routingKey,
+ ContentHeaderBody contentHeader,
+ List bodies,
AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache,
- AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache)
+ AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache,
+ final int addressType)
throws AMQException, JMSException
{
BasicContentHeaderProperties properties = contentHeader.getProperties();
@@ -124,7 +130,7 @@ public class MessageFactoryRegistry
mf = _default;
}
- return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies, queueDestinationCache, topicDestinationCache);
+ return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies, queueDestinationCache, topicDestinationCache, addressType);
}
public AbstractJMSMessage createMessage(MessageTransfer transfer) throws AMQException, JMSException
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java?rev=1621143&r1=1621142&r2=1621143&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java Thu Aug 28 14:31:52 2014
@@ -159,7 +159,7 @@ public class AddressHelper
}
}
- public int getNodeType() throws Exception
+ public int getNodeType()
{
if (_nodePropAccess == null || _nodePropAccess.getString(TYPE) == null)
{
@@ -176,7 +176,7 @@ public class AddressHelper
}
else
{
- throw new Exception("unkown exchange type");
+ throw new IllegalArgumentException("unknown exchange type");
}
}
@@ -212,7 +212,7 @@ public class AddressHelper
return (result == null) ? defaultValue : result.booleanValue();
}
- public Link getLink() throws Exception
+ public Link getLink()
{
Link link = new Link();
link.setSubscription(new Subscription());
@@ -235,7 +235,7 @@ public class AddressHelper
}
else
{
- throw new Exception("The reliability mode '" +
+ throw new IllegalArgumentException("The reliability mode '" +
reliability + "' is not yet supported");
}
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java?rev=1621143&r1=1621142&r2=1621143&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java Thu Aug 28 14:31:52 2014
@@ -255,7 +255,7 @@ public class PropertiesFileInitialContex
{
try
{
- return AMQDestination.createDestination(str);
+ return AMQDestination.createDestination(str, false);
}
catch (Exception e)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org