You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/08/30 10:06:34 UTC
[2/2] qpid-broker-j git commit: QPID-7602: [Java Broker] Convert
reply-to from internal format into AMQP 0-x
QPID-7602: [Java Broker] Convert reply-to from internal format into AMQP 0-x
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/aefd3e9a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/aefd3e9a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/aefd3e9a
Branch: refs/heads/master
Commit: aefd3e9a231cd0015d5d65e63fd7f2f113eba2fb
Parents: cf3baaf
Author: Lorenz Quack <lq...@apache.org>
Authored: Tue Aug 29 15:35:03 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Wed Aug 30 11:01:27 2017 +0100
----------------------------------------------------------------------
.../MessageConverter_Internal_to_v0_10.java | 44 ++++++++---
...PropertyConverter_Internal_to_v0_10Test.java | 77 ++++++++++++++++++
.../v0_8/MessageConverter_Internal_to_v0_8.java | 58 ++++++++++++--
.../PropertyConverter_Internal_to_v0_8Test.java | 83 ++++++++++++++++++++
.../v0_8_v1_0/MessageConverter_1_0_to_v0_8.java | 48 +++++++++--
5 files changed, 287 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/aefd3e9a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
index 700a0bc..4c74784 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
@@ -27,21 +27,26 @@ import java.util.Collections;
import java.util.UUID;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.message.internal.InternalMessageHeader;
import org.apache.qpid.server.message.mimecontentconverter.MimeContentConverterRegistry;
import org.apache.qpid.server.message.mimecontentconverter.ObjectToMimeContentConverter;
+import org.apache.qpid.server.model.DestinationAddress;
+import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.plugin.PluggableService;
import org.apache.qpid.server.protocol.converter.MessageConversionException;
-import org.apache.qpid.server.protocol.v0_10.transport.EncoderUtils;
-import org.apache.qpid.server.protocol.v0_10.transport.MessageDeliveryMode;
-import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
+import org.apache.qpid.server.protocol.v0_10.transport.EncoderUtils;
import org.apache.qpid.server.protocol.v0_10.transport.Header;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageDeliveryMode;
import org.apache.qpid.server.protocol.v0_10.transport.MessageDeliveryPriority;
import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
+import org.apache.qpid.server.protocol.v0_10.transport.ReplyTo;
+import org.apache.qpid.server.store.StoredMessage;
@PluggableService
public class MessageConverter_Internal_to_v0_10 implements MessageConverter<InternalMessage, MessageTransferMessage>
@@ -64,7 +69,7 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte
@Override
public MessageTransferMessage convert(InternalMessage serverMsg, NamedAddressSpace addressSpace)
{
- return new MessageTransferMessage(convertToStoredMessage(serverMsg), null);
+ return new MessageTransferMessage(convertToStoredMessage(serverMsg, addressSpace), null);
}
@Override
@@ -73,7 +78,8 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte
}
- private StoredMessage<MessageMetaData_0_10> convertToStoredMessage(final InternalMessage serverMsg)
+ private StoredMessage<MessageMetaData_0_10> convertToStoredMessage(final InternalMessage serverMsg,
+ final NamedAddressSpace addressSpace)
{
Object messageBody = serverMsg.getMessageBody();
ObjectToMimeContentConverter converter = MimeContentConverterRegistry.getBestFitObjectToMimeContentConverter(messageBody);
@@ -82,9 +88,8 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte
mimeType = improveMimeType(serverMsg, mimeType);
- final MessageMetaData_0_10 messageMetaData_0_10 = convertMetaData(serverMsg,
- mimeType,
- messageContent.length);
+ final MessageMetaData_0_10 messageMetaData_0_10 =
+ convertMetaData(serverMsg, addressSpace, mimeType, messageContent.length);
final int metadataSize = messageMetaData_0_10.getStorableSize();
return new StoredMessage<MessageMetaData_0_10>()
@@ -162,7 +167,10 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte
return mimeType;
}
- private MessageMetaData_0_10 convertMetaData(InternalMessage serverMsg, final String bodyMimeType, final int size)
+ private MessageMetaData_0_10 convertMetaData(final InternalMessage serverMsg,
+ final NamedAddressSpace addressSpace,
+ final String bodyMimeType,
+ final int size)
{
DeliveryProperties deliveryProps = new DeliveryProperties();
MessageProperties messageProps = new MessageProperties();
@@ -218,10 +226,28 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte
messageProps.setUserId(bytes);
}
}
+
+ final String origReplyTo = messageHeader.getReplyTo();
+ if (origReplyTo != null && !origReplyTo.equals(""))
+ {
+ messageProps.setReplyTo(getReplyTo(addressSpace, origReplyTo));
+ }
+
Header header = new Header(deliveryProps, messageProps, null);
return new MessageMetaData_0_10(header, size, serverMsg.getArrivalTime());
}
+ private ReplyTo getReplyTo(final NamedAddressSpace addressSpace, final String origReplyTo)
+ {
+ DestinationAddress destinationAddress = new DestinationAddress(addressSpace, origReplyTo);
+ MessageDestination messageDestination = destinationAddress.getMessageDestination();
+ final String exchange = ensureStr8("reply-to[\"exchange\"]", messageDestination instanceof Exchange
+ ? messageDestination.getName() : "");
+ final String routingKey = ensureStr8("reply-to[\"routing-key\"]", messageDestination instanceof Queue
+ ? messageDestination.getName() : destinationAddress.getRoutingKey());
+ return new ReplyTo(exchange, routingKey);
+ }
+
private void validateValue(final Object value, final String path)
{
if (!EncoderUtils.isEncodable(value))
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/aefd3e9a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/PropertyConverter_Internal_to_v0_10Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/PropertyConverter_Internal_to_v0_10Test.java b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/PropertyConverter_Internal_to_v0_10Test.java
index d56ca24..69962fc 100644
--- a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/PropertyConverter_Internal_to_v0_10Test.java
+++ b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/PropertyConverter_Internal_to_v0_10Test.java
@@ -22,7 +22,9 @@
package org.apache.qpid.server.protocol.v0_10;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.mockito.AdditionalAnswers.returnsFirstArg;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -43,9 +45,12 @@ import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.message.internal.InternalMessageHeader;
import org.apache.qpid.server.message.internal.InternalMessageMetaData;
import org.apache.qpid.server.message.internal.InternalMessageMetaDataType;
+import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.converter.MessageConversionException;
import org.apache.qpid.server.protocol.v0_10.transport.MessageDeliveryMode;
+import org.apache.qpid.server.protocol.v0_10.transport.ReplyTo;
import org.apache.qpid.server.protocol.v0_8.AMQShortString;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -60,6 +65,7 @@ public class PropertyConverter_Internal_to_v0_10Test extends QpidTestCase
{
super.setUp();
_addressSpace = mock(NamedAddressSpace.class);
+ when(_addressSpace.getLocalAddress(anyString())).then(returnsFirstArg());
_messageConverter = new MessageConverter_Internal_to_v0_10();
}
@@ -354,6 +360,77 @@ public class PropertyConverter_Internal_to_v0_10Test extends QpidTestCase
convertedMessage.getHeader().getMessageProperties().getContentLength());
}
+ public void testReplyToConversionWhenQueueIsSpecified() throws IOException
+ {
+ final AMQMessageHeader header = mock(AMQMessageHeader.class);
+ final String replyTo = "myTestQueue";
+ final Queue queue = mock(Queue.class);
+ when(queue.getName()).thenReturn(replyTo);
+ when(_addressSpace.getAttainedMessageDestination(replyTo)).thenReturn(queue);
+ when(header.getReplyTo()).thenReturn(replyTo);
+ InternalMessage originalMessage = createTestMessage(header);
+
+ MessageTransferMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace);
+
+ final ReplyTo convertedReplyTo =
+ convertedMessage.getHeader().getMessageProperties().getReplyTo();
+ assertEquals("Unexpected exchange", "", convertedReplyTo.getExchange());
+ assertEquals("Unexpected routing key", replyTo, convertedReplyTo.getRoutingKey());
+ }
+
+ public void testReplyToConversionWhenExchangeIsSpecified() throws IOException
+ {
+ final AMQMessageHeader header = mock(AMQMessageHeader.class);
+ final String replyTo = "myTestExchange";
+ final Exchange exchange = mock(Exchange.class);
+ when(exchange.getName()).thenReturn(replyTo);
+ when(_addressSpace.getAttainedMessageDestination(replyTo)).thenReturn(exchange);
+ when(header.getReplyTo()).thenReturn(replyTo);
+ InternalMessage originalMessage = createTestMessage(header);
+
+ MessageTransferMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace);
+
+ final ReplyTo convertedReplyTo =
+ convertedMessage.getHeader().getMessageProperties().getReplyTo();
+ assertEquals("Unexpected exchange", replyTo, convertedReplyTo.getExchange());
+ assertEquals("Unexpected routing key", "", convertedReplyTo.getRoutingKey());
+ }
+
+ public void testReplyToConversionWhenExchangeAndRoutingKeyAreSpecified() throws IOException
+ {
+ final AMQMessageHeader header = mock(AMQMessageHeader.class);
+ final String exchangeName = "testExchnageName";
+ final String routingKey = "testRoutingKey";
+ final String replyTo = String.format("%s/%s", exchangeName, routingKey);
+ final Exchange exchange = mock(Exchange.class);
+ when(exchange.getName()).thenReturn(exchangeName);
+ when(_addressSpace.getAttainedMessageDestination(exchangeName)).thenReturn(exchange);
+ when(header.getReplyTo()).thenReturn(replyTo);
+ InternalMessage originalMessage = createTestMessage(header);
+
+ MessageTransferMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace);
+
+ final ReplyTo convertedReplyTo =
+ convertedMessage.getHeader().getMessageProperties().getReplyTo();
+ assertEquals("Unexpected exchange", exchangeName, convertedReplyTo.getExchange());
+ assertEquals("Unexpected routing key", routingKey, convertedReplyTo.getRoutingKey());
+ }
+
+ public void testReplyToConversionWhenReplyToCannotBeResolved() throws IOException
+ {
+ final AMQMessageHeader header = mock(AMQMessageHeader.class);
+ final String replyTo = "direct://amq.direct//test?routingkey='test'";
+ when(header.getReplyTo()).thenReturn(replyTo);
+ InternalMessage originalMessage = createTestMessage(header);
+
+ MessageTransferMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace);
+
+ final ReplyTo convertedReplyTo =
+ convertedMessage.getHeader().getMessageProperties().getReplyTo();
+ assertEquals("Unexpected exchange", "", convertedReplyTo.getExchange());
+ assertEquals("Unexpected routing key", replyTo, convertedReplyTo.getRoutingKey());
+ }
+
private InternalMessage createTestMessage(final AMQMessageHeader header) throws IOException
{
return createTestMessage(header, null, false, System.currentTimeMillis());
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/aefd3e9a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
index a8ff769..1678a8d 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
@@ -26,10 +26,15 @@ import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.exchange.ExchangeDefaults;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.message.mimecontentconverter.MimeContentConverterRegistry;
import org.apache.qpid.server.message.mimecontentconverter.ObjectToMimeContentConverter;
+import org.apache.qpid.server.model.DestinationAddress;
+import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.plugin.PluggableService;
import org.apache.qpid.server.protocol.converter.MessageConversionException;
@@ -58,7 +63,7 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter
@Override
public AMQMessage convert(InternalMessage serverMsg, NamedAddressSpace addressSpace)
{
- return new AMQMessage(convertToStoredMessage(serverMsg), null);
+ return new AMQMessage(convertToStoredMessage(serverMsg, addressSpace), null);
}
@Override
@@ -67,7 +72,8 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter
}
- private StoredMessage<MessageMetaData> convertToStoredMessage(final InternalMessage serverMsg)
+ private StoredMessage<MessageMetaData> convertToStoredMessage(final InternalMessage serverMsg,
+ final NamedAddressSpace addressSpace)
{
Object messageBody = serverMsg.getMessageBody();
ObjectToMimeContentConverter converter = MimeContentConverterRegistry.getBestFitObjectToMimeContentConverter(messageBody);
@@ -85,9 +91,8 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter
mimeType = improveMimeType(serverMsg, mimeType);
- final MessageMetaData messageMetaData_0_8 = convertMetaData(serverMsg,
- mimeType,
- messageContent.length);
+ final MessageMetaData messageMetaData_0_8 =
+ convertMetaData(serverMsg, addressSpace, mimeType, messageContent.length);
final int metadataSize = messageMetaData_0_8.getStorableSize();
return new StoredMessage<MessageMetaData>()
@@ -164,7 +169,10 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter
return mimeType;
}
- private MessageMetaData convertMetaData(final InternalMessage serverMsg, final String bodyMimeType, final int size)
+ private MessageMetaData convertMetaData(final InternalMessage serverMsg,
+ final NamedAddressSpace addressSpace,
+ final String bodyMimeType,
+ final int size)
{
MessagePublishInfo publishInfo = new MessagePublishInfo(AMQShortString.EMPTY_STRING,
@@ -181,7 +189,7 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter
props.setExpiration(serverMsg.getExpiration());
props.setMessageId(convertToOptionalAMQPShortString(serverMsg.getMessageHeader().getMessageId()));
props.setPriority(serverMsg.getMessageHeader().getPriority());
- props.setReplyTo(convertToOptionalAMQPShortString(serverMsg.getMessageHeader().getReplyTo()));
+ props.setReplyTo(convertToShortStringForProperty("reply-to", getReplyTo(serverMsg, addressSpace)));
props.setTimestamp(serverMsg.getMessageHeader().getTimestamp());
props.setUserId(convertToOptionalAMQPShortString(serverMsg.getMessageHeader().getUserId()));
@@ -209,6 +217,42 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter
return new MessageMetaData(publishInfo, chb, serverMsg.getArrivalTime());
}
+ private String getReplyTo(final InternalMessage serverMsg, final NamedAddressSpace addressSpace)
+ {
+ String replyTo = serverMsg.getMessageHeader().getReplyTo();
+
+ if (replyTo != null)
+ {
+ DestinationAddress destinationAddress = new DestinationAddress(addressSpace, replyTo);
+ MessageDestination messageDestination = destinationAddress.getMessageDestination();
+
+ final String replyToBindingUrl;
+ if (messageDestination instanceof Exchange)
+ {
+ Exchange<?> exchange = (Exchange<?>) messageDestination;
+ replyToBindingUrl = String.format("%s://%s//?routingkey='%s'",
+ exchange.getType(),
+ exchange.getName(),
+ destinationAddress.getRoutingKey());
+ }
+ else if (messageDestination instanceof Queue)
+ {
+ replyToBindingUrl = String.format("%s:////%s",
+ ExchangeDefaults.DIRECT_EXCHANGE_CLASS,
+ messageDestination.getName());
+ }
+ else
+ {
+ replyToBindingUrl = String.format("%s:////?routingkey='%s'",
+ ExchangeDefaults.DIRECT_EXCHANGE_CLASS,
+ destinationAddress.getRoutingKey());
+ }
+
+ return replyToBindingUrl;
+ }
+ return null;
+ }
+
private AMQShortString convertToOptionalAMQPShortString(final String stringValue)
{
AMQShortString result;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/aefd3e9a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/PropertyConverter_Internal_to_v0_8Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/PropertyConverter_Internal_to_v0_8Test.java b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/PropertyConverter_Internal_to_v0_8Test.java
index b279bf6..0d8baee 100644
--- a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/PropertyConverter_Internal_to_v0_8Test.java
+++ b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/PropertyConverter_Internal_to_v0_8Test.java
@@ -21,7 +21,9 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import static org.mockito.AdditionalAnswers.returnsFirstArg;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -30,12 +32,15 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.message.internal.InternalMessageHeader;
import org.apache.qpid.server.message.internal.InternalMessageMetaData;
import org.apache.qpid.server.message.internal.InternalMessageMetaDataType;
+import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.converter.MessageConversionException;
import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
import org.apache.qpid.server.store.StoredMessage;
@@ -52,6 +57,7 @@ public class PropertyConverter_Internal_to_v0_8Test extends QpidTestCase
super.setUp();
_messageConverter = new MessageConverter_Internal_to_v0_8();
_addressSpace = mock(NamedAddressSpace.class);
+ when(_addressSpace.getLocalAddress(anyString())).then(returnsFirstArg());
}
public void testDurableTrueConversion()
@@ -273,6 +279,83 @@ public class PropertyConverter_Internal_to_v0_8Test extends QpidTestCase
}
}
+ public void testReplyToConversionWhenQueueIsSpecified()
+ {
+ final String replyTo = "testQueue";
+ final AMQMessageHeader header = mock(AMQMessageHeader.class);
+ when(header.getReplyTo()).thenReturn(replyTo);
+ Queue queue = mock(Queue.class);
+ when(queue.getName()).thenReturn(replyTo);
+ when(_addressSpace.getAttainedMessageDestination(replyTo)).thenReturn(queue);
+
+ InternalMessage originalMessage = createTestMessage(header);
+
+ AMQMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace);
+
+ assertEquals("Unexpected reply-to",
+ "direct:////" + replyTo,
+ convertedMessage.getContentHeaderBody().getProperties().getReplyToAsString());
+ }
+
+ public void testReplyToConversionWhenExchangeIsSpecified()
+ {
+ final String replyTo = "testExchange";
+ final AMQMessageHeader header = mock(AMQMessageHeader.class);
+ when(header.getReplyTo()).thenReturn(replyTo);
+ Exchange exchange = mock(Exchange.class);
+ when(exchange.getName()).thenReturn(replyTo);
+ when(exchange.getType()).thenReturn(ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
+
+ when(_addressSpace.getAttainedMessageDestination(replyTo)).thenReturn(exchange);
+
+ InternalMessage originalMessage = createTestMessage(header);
+
+ AMQMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace);
+
+ assertEquals("Unexpected reply-to",
+ "fanout://" + replyTo + "//?routingkey=''",
+ convertedMessage.getContentHeaderBody().getProperties().getReplyToAsString());
+ }
+
+ public void testReplyToConversionWhenExchangeAndRoutingKeyAreSpecified()
+ {
+ final String exchangeName = "testExchange";
+ final String routingKey = "testKey";
+ final String replyTo = String.format("%s/%s", exchangeName, routingKey);
+ final AMQMessageHeader header = mock(AMQMessageHeader.class);
+ when(header.getReplyTo()).thenReturn(replyTo);
+ Exchange exchange = mock(Exchange.class);
+ when(exchange.getName()).thenReturn(exchangeName);
+ when(exchange.getType()).thenReturn(ExchangeDefaults.TOPIC_EXCHANGE_CLASS);
+
+ when(_addressSpace.getAttainedMessageDestination(exchangeName)).thenReturn(exchange);
+
+ InternalMessage originalMessage = createTestMessage(header);
+
+ AMQMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace);
+
+ assertEquals("Unexpected reply-to",
+ "topic://" + exchangeName + "//?routingkey='" + routingKey + "'",
+ convertedMessage.getContentHeaderBody().getProperties().getReplyToAsString());
+ }
+
+ public void testReplyToConversionWhenNonExistingExchangeAndRoutingKeyAreSpecified()
+ {
+ final String exchangeName = "testExchange";
+ final String routingKey = "testKey";
+ final String replyTo = String.format("%s/%s", exchangeName, routingKey);
+ final AMQMessageHeader header = mock(AMQMessageHeader.class);
+ when(header.getReplyTo()).thenReturn(replyTo);
+
+ InternalMessage originalMessage = createTestMessage(header);
+
+ AMQMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace);
+
+ assertEquals("Unexpected reply-to",
+ "direct:////?routingkey='" + replyTo + "'",
+ convertedMessage.getContentHeaderBody().getProperties().getReplyToAsString());
+ }
+
private InternalMessage createTestMessage(final AMQMessageHeader header)
{
return createTestMessage(header, null, false);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/aefd3e9a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
index 73e6b65..b309a56 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
@@ -39,7 +39,10 @@ import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.model.DestinationAddress;
+import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.plugin.MessageConverter;
@@ -343,16 +346,47 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_
private AMQShortString getReplyTo(final Message_1_0 serverMsg, final NamedAddressSpace addressSpace)
{
- // TODO : QPID-7602 - we probably need to look up the replyTo object and construct the correct BURL based on that
final String replyTo = serverMsg.getMessageHeader().getReplyTo();
- try
- {
- return AMQShortString.valueOf(replyTo);
- }
- catch (IllegalArgumentException e)
+
+ if (replyTo != null)
{
- throw new MessageConversionException("Could not convert message from 1.0 to 0-8 because conversion of 'reply-to' failed.", e);
+ DestinationAddress destinationAddress = new DestinationAddress(addressSpace, replyTo);
+ MessageDestination messageDestination = destinationAddress.getMessageDestination();
+
+ final String replyToBindingUrl;
+ if (messageDestination instanceof Exchange)
+ {
+ Exchange<?> exchange = (Exchange<?>) messageDestination;
+ replyToBindingUrl = String.format("%s://%s//?routingkey='%s'",
+ exchange.getType(),
+ exchange.getName(),
+ destinationAddress.getRoutingKey());
+ }
+ else if (messageDestination instanceof Queue)
+ {
+ replyToBindingUrl = String.format("%s:////%s",
+ ExchangeDefaults.DIRECT_EXCHANGE_CLASS,
+ messageDestination.getName());
+ }
+ else
+ {
+ replyToBindingUrl = String.format("%s:////?routingkey='%s'",
+ ExchangeDefaults.DIRECT_EXCHANGE_CLASS,
+ destinationAddress.getRoutingKey());
+ }
+
+ try
+ {
+ return AMQShortString.valueOf(replyToBindingUrl);
+ }
+ catch (IllegalArgumentException e)
+ {
+ throw new MessageConversionException(
+ "Could not convert message from 1.0 to 0-8 because conversion of 'reply-to' failed.",
+ e);
+ }
}
+ return null;
}
private AMQShortString getCorrelationIdAsShortString(final Message_1_0 serverMsg)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org