You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/08/30 10:06:34 UTC

[2/2] qpid-broker-j git commit: QPID-7602: [Java Broker] Convert reply-to from internal format into AMQP 0-x

QPID-7602: [Java Broker] Convert reply-to from internal format into AMQP 0-x


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/aefd3e9a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/aefd3e9a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/aefd3e9a

Branch: refs/heads/master
Commit: aefd3e9a231cd0015d5d65e63fd7f2f113eba2fb
Parents: cf3baaf
Author: Lorenz Quack <lq...@apache.org>
Authored: Tue Aug 29 15:35:03 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Wed Aug 30 11:01:27 2017 +0100

----------------------------------------------------------------------
 .../MessageConverter_Internal_to_v0_10.java     | 44 ++++++++---
 ...PropertyConverter_Internal_to_v0_10Test.java | 77 ++++++++++++++++++
 .../v0_8/MessageConverter_Internal_to_v0_8.java | 58 ++++++++++++--
 .../PropertyConverter_Internal_to_v0_8Test.java | 83 ++++++++++++++++++++
 .../v0_8_v1_0/MessageConverter_1_0_to_v0_8.java | 48 +++++++++--
 5 files changed, 287 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/aefd3e9a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
index 700a0bc..4c74784 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
@@ -27,21 +27,26 @@ import java.util.Collections;
 import java.util.UUID;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.message.internal.InternalMessageHeader;
 import org.apache.qpid.server.message.mimecontentconverter.MimeContentConverterRegistry;
 import org.apache.qpid.server.message.mimecontentconverter.ObjectToMimeContentConverter;
+import org.apache.qpid.server.model.DestinationAddress;
+import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.plugin.PluggableService;
 import org.apache.qpid.server.protocol.converter.MessageConversionException;
-import org.apache.qpid.server.protocol.v0_10.transport.EncoderUtils;
-import org.apache.qpid.server.protocol.v0_10.transport.MessageDeliveryMode;
-import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
+import org.apache.qpid.server.protocol.v0_10.transport.EncoderUtils;
 import org.apache.qpid.server.protocol.v0_10.transport.Header;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageDeliveryMode;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageDeliveryPriority;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
+import org.apache.qpid.server.protocol.v0_10.transport.ReplyTo;
+import org.apache.qpid.server.store.StoredMessage;
 
 @PluggableService
 public class MessageConverter_Internal_to_v0_10 implements MessageConverter<InternalMessage, MessageTransferMessage>
@@ -64,7 +69,7 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte
     @Override
     public MessageTransferMessage convert(InternalMessage serverMsg, NamedAddressSpace addressSpace)
     {
-        return new MessageTransferMessage(convertToStoredMessage(serverMsg), null);
+        return new MessageTransferMessage(convertToStoredMessage(serverMsg, addressSpace), null);
     }
 
     @Override
@@ -73,7 +78,8 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte
 
     }
 
-    private StoredMessage<MessageMetaData_0_10> convertToStoredMessage(final InternalMessage serverMsg)
+    private StoredMessage<MessageMetaData_0_10> convertToStoredMessage(final InternalMessage serverMsg,
+                                                                       final NamedAddressSpace addressSpace)
     {
         Object messageBody = serverMsg.getMessageBody();
         ObjectToMimeContentConverter converter = MimeContentConverterRegistry.getBestFitObjectToMimeContentConverter(messageBody);
@@ -82,9 +88,8 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte
 
         mimeType = improveMimeType(serverMsg, mimeType);
 
-        final MessageMetaData_0_10 messageMetaData_0_10 = convertMetaData(serverMsg,
-                                                                          mimeType,
-                                                                          messageContent.length);
+        final MessageMetaData_0_10 messageMetaData_0_10 =
+                convertMetaData(serverMsg, addressSpace, mimeType, messageContent.length);
         final int metadataSize = messageMetaData_0_10.getStorableSize();
 
         return new StoredMessage<MessageMetaData_0_10>()
@@ -162,7 +167,10 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte
         return mimeType;
     }
 
-    private MessageMetaData_0_10 convertMetaData(InternalMessage serverMsg, final String bodyMimeType, final int size)
+    private MessageMetaData_0_10 convertMetaData(final InternalMessage serverMsg,
+                                                 final NamedAddressSpace addressSpace,
+                                                 final String bodyMimeType,
+                                                 final int size)
     {
         DeliveryProperties deliveryProps = new DeliveryProperties();
         MessageProperties messageProps = new MessageProperties();
@@ -218,10 +226,28 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte
                 messageProps.setUserId(bytes);
             }
         }
+
+        final String origReplyTo = messageHeader.getReplyTo();
+        if (origReplyTo != null && !origReplyTo.equals(""))
+        {
+            messageProps.setReplyTo(getReplyTo(addressSpace, origReplyTo));
+        }
+
         Header header = new Header(deliveryProps, messageProps, null);
         return new MessageMetaData_0_10(header, size, serverMsg.getArrivalTime());
     }
 
+    private ReplyTo getReplyTo(final NamedAddressSpace addressSpace, final String origReplyTo)
+    {
+        DestinationAddress destinationAddress = new DestinationAddress(addressSpace, origReplyTo);
+        MessageDestination messageDestination = destinationAddress.getMessageDestination();
+        final String exchange = ensureStr8("reply-to[\"exchange\"]", messageDestination instanceof Exchange
+                ? messageDestination.getName() : "");
+        final String routingKey = ensureStr8("reply-to[\"routing-key\"]", messageDestination instanceof Queue
+                ? messageDestination.getName() : destinationAddress.getRoutingKey());
+        return new ReplyTo(exchange, routingKey);
+    }
+
     private void validateValue(final Object value, final String path)
     {
         if (!EncoderUtils.isEncodable(value))

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/aefd3e9a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/PropertyConverter_Internal_to_v0_10Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/PropertyConverter_Internal_to_v0_10Test.java b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/PropertyConverter_Internal_to_v0_10Test.java
index d56ca24..69962fc 100644
--- a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/PropertyConverter_Internal_to_v0_10Test.java
+++ b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/PropertyConverter_Internal_to_v0_10Test.java
@@ -22,7 +22,9 @@
 package org.apache.qpid.server.protocol.v0_10;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.mockito.AdditionalAnswers.returnsFirstArg;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -43,9 +45,12 @@ import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.message.internal.InternalMessageHeader;
 import org.apache.qpid.server.message.internal.InternalMessageMetaData;
 import org.apache.qpid.server.message.internal.InternalMessageMetaDataType;
+import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.protocol.converter.MessageConversionException;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageDeliveryMode;
+import org.apache.qpid.server.protocol.v0_10.transport.ReplyTo;
 import org.apache.qpid.server.protocol.v0_8.AMQShortString;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.test.utils.QpidTestCase;
@@ -60,6 +65,7 @@ public class PropertyConverter_Internal_to_v0_10Test extends QpidTestCase
     {
         super.setUp();
         _addressSpace = mock(NamedAddressSpace.class);
+        when(_addressSpace.getLocalAddress(anyString())).then(returnsFirstArg());
         _messageConverter = new MessageConverter_Internal_to_v0_10();
     }
 
@@ -354,6 +360,77 @@ public class PropertyConverter_Internal_to_v0_10Test extends QpidTestCase
                      convertedMessage.getHeader().getMessageProperties().getContentLength());
     }
 
+    public void testReplyToConversionWhenQueueIsSpecified() throws IOException
+    {
+        final AMQMessageHeader header = mock(AMQMessageHeader.class);
+        final String replyTo = "myTestQueue";
+        final Queue queue = mock(Queue.class);
+        when(queue.getName()).thenReturn(replyTo);
+        when(_addressSpace.getAttainedMessageDestination(replyTo)).thenReturn(queue);
+        when(header.getReplyTo()).thenReturn(replyTo);
+        InternalMessage originalMessage = createTestMessage(header);
+
+        MessageTransferMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace);
+
+        final ReplyTo convertedReplyTo =
+                convertedMessage.getHeader().getMessageProperties().getReplyTo();
+        assertEquals("Unexpected exchange", "", convertedReplyTo.getExchange());
+        assertEquals("Unexpected routing key", replyTo, convertedReplyTo.getRoutingKey());
+    }
+
+    public void testReplyToConversionWhenExchangeIsSpecified() throws IOException
+    {
+        final AMQMessageHeader header = mock(AMQMessageHeader.class);
+        final String replyTo = "myTestExchange";
+        final Exchange exchange = mock(Exchange.class);
+        when(exchange.getName()).thenReturn(replyTo);
+        when(_addressSpace.getAttainedMessageDestination(replyTo)).thenReturn(exchange);
+        when(header.getReplyTo()).thenReturn(replyTo);
+        InternalMessage originalMessage = createTestMessage(header);
+
+        MessageTransferMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace);
+
+        final ReplyTo convertedReplyTo =
+                convertedMessage.getHeader().getMessageProperties().getReplyTo();
+        assertEquals("Unexpected exchange", replyTo, convertedReplyTo.getExchange());
+        assertEquals("Unexpected routing key", "", convertedReplyTo.getRoutingKey());
+    }
+
+    public void testReplyToConversionWhenExchangeAndRoutingKeyAreSpecified() throws IOException
+    {
+        final AMQMessageHeader header = mock(AMQMessageHeader.class);
+        final String exchangeName = "testExchnageName";
+        final String routingKey = "testRoutingKey";
+        final String replyTo = String.format("%s/%s", exchangeName, routingKey);
+        final Exchange exchange = mock(Exchange.class);
+        when(exchange.getName()).thenReturn(exchangeName);
+        when(_addressSpace.getAttainedMessageDestination(exchangeName)).thenReturn(exchange);
+        when(header.getReplyTo()).thenReturn(replyTo);
+        InternalMessage originalMessage = createTestMessage(header);
+
+        MessageTransferMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace);
+
+        final ReplyTo convertedReplyTo =
+                convertedMessage.getHeader().getMessageProperties().getReplyTo();
+        assertEquals("Unexpected exchange", exchangeName, convertedReplyTo.getExchange());
+        assertEquals("Unexpected routing key", routingKey, convertedReplyTo.getRoutingKey());
+    }
+
+    public void testReplyToConversionWhenReplyToCannotBeResolved() throws IOException
+    {
+        final AMQMessageHeader header = mock(AMQMessageHeader.class);
+        final String replyTo = "direct://amq.direct//test?routingkey='test'";
+        when(header.getReplyTo()).thenReturn(replyTo);
+        InternalMessage originalMessage = createTestMessage(header);
+
+        MessageTransferMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace);
+
+        final ReplyTo convertedReplyTo =
+                convertedMessage.getHeader().getMessageProperties().getReplyTo();
+        assertEquals("Unexpected exchange", "", convertedReplyTo.getExchange());
+        assertEquals("Unexpected routing key", replyTo, convertedReplyTo.getRoutingKey());
+    }
+
     private InternalMessage createTestMessage(final AMQMessageHeader header) throws IOException
     {
         return createTestMessage(header, null, false, System.currentTimeMillis());

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/aefd3e9a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
index a8ff769..1678a8d 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
@@ -26,10 +26,15 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.exchange.ExchangeDefaults;
+import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.message.mimecontentconverter.MimeContentConverterRegistry;
 import org.apache.qpid.server.message.mimecontentconverter.ObjectToMimeContentConverter;
+import org.apache.qpid.server.model.DestinationAddress;
+import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.plugin.PluggableService;
 import org.apache.qpid.server.protocol.converter.MessageConversionException;
@@ -58,7 +63,7 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter
     @Override
     public AMQMessage convert(InternalMessage serverMsg, NamedAddressSpace addressSpace)
     {
-        return new AMQMessage(convertToStoredMessage(serverMsg), null);
+        return new AMQMessage(convertToStoredMessage(serverMsg, addressSpace), null);
     }
 
     @Override
@@ -67,7 +72,8 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter
 
     }
 
-    private StoredMessage<MessageMetaData> convertToStoredMessage(final InternalMessage serverMsg)
+    private StoredMessage<MessageMetaData> convertToStoredMessage(final InternalMessage serverMsg,
+                                                                  final NamedAddressSpace addressSpace)
     {
         Object messageBody = serverMsg.getMessageBody();
         ObjectToMimeContentConverter converter = MimeContentConverterRegistry.getBestFitObjectToMimeContentConverter(messageBody);
@@ -85,9 +91,8 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter
 
         mimeType = improveMimeType(serverMsg, mimeType);
 
-        final MessageMetaData messageMetaData_0_8 = convertMetaData(serverMsg,
-                                                                    mimeType,
-                                                                    messageContent.length);
+        final MessageMetaData messageMetaData_0_8 =
+                convertMetaData(serverMsg, addressSpace, mimeType, messageContent.length);
         final int metadataSize = messageMetaData_0_8.getStorableSize();
 
         return new StoredMessage<MessageMetaData>()
@@ -164,7 +169,10 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter
         return mimeType;
     }
 
-    private MessageMetaData convertMetaData(final InternalMessage serverMsg, final String bodyMimeType, final int size)
+    private MessageMetaData convertMetaData(final InternalMessage serverMsg,
+                                            final NamedAddressSpace addressSpace,
+                                            final String bodyMimeType,
+                                            final int size)
     {
 
         MessagePublishInfo publishInfo = new MessagePublishInfo(AMQShortString.EMPTY_STRING,
@@ -181,7 +189,7 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter
         props.setExpiration(serverMsg.getExpiration());
         props.setMessageId(convertToOptionalAMQPShortString(serverMsg.getMessageHeader().getMessageId()));
         props.setPriority(serverMsg.getMessageHeader().getPriority());
-        props.setReplyTo(convertToOptionalAMQPShortString(serverMsg.getMessageHeader().getReplyTo()));
+        props.setReplyTo(convertToShortStringForProperty("reply-to", getReplyTo(serverMsg, addressSpace)));
         props.setTimestamp(serverMsg.getMessageHeader().getTimestamp());
 
         props.setUserId(convertToOptionalAMQPShortString(serverMsg.getMessageHeader().getUserId()));
@@ -209,6 +217,42 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter
         return new MessageMetaData(publishInfo, chb, serverMsg.getArrivalTime());
     }
 
+    private String getReplyTo(final InternalMessage serverMsg, final NamedAddressSpace addressSpace)
+    {
+        String replyTo = serverMsg.getMessageHeader().getReplyTo();
+
+        if (replyTo != null)
+        {
+            DestinationAddress destinationAddress = new DestinationAddress(addressSpace, replyTo);
+            MessageDestination messageDestination = destinationAddress.getMessageDestination();
+
+            final String replyToBindingUrl;
+            if (messageDestination instanceof Exchange)
+            {
+                Exchange<?> exchange = (Exchange<?>) messageDestination;
+                replyToBindingUrl = String.format("%s://%s//?routingkey='%s'",
+                                                  exchange.getType(),
+                                                  exchange.getName(),
+                                                  destinationAddress.getRoutingKey());
+            }
+            else if (messageDestination instanceof Queue)
+            {
+                replyToBindingUrl = String.format("%s:////%s",
+                                                  ExchangeDefaults.DIRECT_EXCHANGE_CLASS,
+                                                  messageDestination.getName());
+            }
+            else
+            {
+                replyToBindingUrl = String.format("%s:////?routingkey='%s'",
+                                                  ExchangeDefaults.DIRECT_EXCHANGE_CLASS,
+                                                  destinationAddress.getRoutingKey());
+            }
+
+            return replyToBindingUrl;
+        }
+        return null;
+    }
+
     private AMQShortString convertToOptionalAMQPShortString(final String stringValue)
     {
         AMQShortString result;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/aefd3e9a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/PropertyConverter_Internal_to_v0_8Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/PropertyConverter_Internal_to_v0_8Test.java b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/PropertyConverter_Internal_to_v0_8Test.java
index b279bf6..0d8baee 100644
--- a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/PropertyConverter_Internal_to_v0_8Test.java
+++ b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/PropertyConverter_Internal_to_v0_8Test.java
@@ -21,7 +21,9 @@
  */
 package org.apache.qpid.server.protocol.v0_8;
 
+import static org.mockito.AdditionalAnswers.returnsFirstArg;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -30,12 +32,15 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.qpid.server.exchange.ExchangeDefaults;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.message.internal.InternalMessageHeader;
 import org.apache.qpid.server.message.internal.InternalMessageMetaData;
 import org.apache.qpid.server.message.internal.InternalMessageMetaDataType;
+import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.protocol.converter.MessageConversionException;
 import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
 import org.apache.qpid.server.store.StoredMessage;
@@ -52,6 +57,7 @@ public class PropertyConverter_Internal_to_v0_8Test extends QpidTestCase
         super.setUp();
         _messageConverter = new MessageConverter_Internal_to_v0_8();
         _addressSpace = mock(NamedAddressSpace.class);
+        when(_addressSpace.getLocalAddress(anyString())).then(returnsFirstArg());
     }
 
     public void testDurableTrueConversion()
@@ -273,6 +279,83 @@ public class PropertyConverter_Internal_to_v0_8Test extends QpidTestCase
         }
     }
 
+    public void testReplyToConversionWhenQueueIsSpecified()
+    {
+        final String replyTo = "testQueue";
+        final AMQMessageHeader header = mock(AMQMessageHeader.class);
+        when(header.getReplyTo()).thenReturn(replyTo);
+        Queue queue = mock(Queue.class);
+        when(queue.getName()).thenReturn(replyTo);
+        when(_addressSpace.getAttainedMessageDestination(replyTo)).thenReturn(queue);
+
+        InternalMessage originalMessage = createTestMessage(header);
+
+        AMQMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace);
+
+        assertEquals("Unexpected reply-to",
+                     "direct:////" + replyTo,
+                     convertedMessage.getContentHeaderBody().getProperties().getReplyToAsString());
+    }
+
+    public void testReplyToConversionWhenExchangeIsSpecified()
+    {
+        final String replyTo = "testExchange";
+        final AMQMessageHeader header = mock(AMQMessageHeader.class);
+        when(header.getReplyTo()).thenReturn(replyTo);
+        Exchange exchange = mock(Exchange.class);
+        when(exchange.getName()).thenReturn(replyTo);
+        when(exchange.getType()).thenReturn(ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
+
+        when(_addressSpace.getAttainedMessageDestination(replyTo)).thenReturn(exchange);
+
+        InternalMessage originalMessage = createTestMessage(header);
+
+        AMQMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace);
+
+        assertEquals("Unexpected reply-to",
+                     "fanout://" + replyTo + "//?routingkey=''",
+                     convertedMessage.getContentHeaderBody().getProperties().getReplyToAsString());
+    }
+
+    public void testReplyToConversionWhenExchangeAndRoutingKeyAreSpecified()
+    {
+        final String exchangeName = "testExchange";
+        final String routingKey = "testKey";
+        final String replyTo = String.format("%s/%s", exchangeName, routingKey);
+        final AMQMessageHeader header = mock(AMQMessageHeader.class);
+        when(header.getReplyTo()).thenReturn(replyTo);
+        Exchange exchange = mock(Exchange.class);
+        when(exchange.getName()).thenReturn(exchangeName);
+        when(exchange.getType()).thenReturn(ExchangeDefaults.TOPIC_EXCHANGE_CLASS);
+
+        when(_addressSpace.getAttainedMessageDestination(exchangeName)).thenReturn(exchange);
+
+        InternalMessage originalMessage = createTestMessage(header);
+
+        AMQMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace);
+
+        assertEquals("Unexpected reply-to",
+                     "topic://" + exchangeName + "//?routingkey='" + routingKey + "'",
+                     convertedMessage.getContentHeaderBody().getProperties().getReplyToAsString());
+    }
+
+    public void testReplyToConversionWhenNonExistingExchangeAndRoutingKeyAreSpecified()
+    {
+        final String exchangeName = "testExchange";
+        final String routingKey = "testKey";
+        final String replyTo = String.format("%s/%s", exchangeName, routingKey);
+        final AMQMessageHeader header = mock(AMQMessageHeader.class);
+        when(header.getReplyTo()).thenReturn(replyTo);
+
+        InternalMessage originalMessage = createTestMessage(header);
+
+        AMQMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace);
+
+        assertEquals("Unexpected reply-to",
+                     "direct:////?routingkey='" + replyTo + "'",
+                     convertedMessage.getContentHeaderBody().getProperties().getReplyToAsString());
+    }
+
     private InternalMessage createTestMessage(final AMQMessageHeader header)
     {
         return createTestMessage(header, null, false);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/aefd3e9a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
index 73e6b65..b309a56 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
@@ -39,7 +39,10 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.exchange.ExchangeDefaults;
 import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.model.DestinationAddress;
+import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.plugin.MessageConverter;
@@ -343,16 +346,47 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_
 
     private AMQShortString getReplyTo(final Message_1_0 serverMsg, final NamedAddressSpace addressSpace)
     {
-        // TODO : QPID-7602 - we probably need to look up the replyTo object and construct the correct BURL based on that
         final String replyTo = serverMsg.getMessageHeader().getReplyTo();
-        try
-        {
-            return AMQShortString.valueOf(replyTo);
-        }
-        catch (IllegalArgumentException e)
+
+        if (replyTo != null)
         {
-            throw new MessageConversionException("Could not convert message from 1.0 to 0-8 because conversion of 'reply-to' failed.", e);
+            DestinationAddress destinationAddress = new DestinationAddress(addressSpace, replyTo);
+            MessageDestination messageDestination = destinationAddress.getMessageDestination();
+
+            final String replyToBindingUrl;
+            if (messageDestination instanceof Exchange)
+            {
+                Exchange<?> exchange = (Exchange<?>) messageDestination;
+                replyToBindingUrl = String.format("%s://%s//?routingkey='%s'",
+                                                  exchange.getType(),
+                                                  exchange.getName(),
+                                                  destinationAddress.getRoutingKey());
+            }
+            else if (messageDestination instanceof Queue)
+            {
+                replyToBindingUrl = String.format("%s:////%s",
+                                                  ExchangeDefaults.DIRECT_EXCHANGE_CLASS,
+                                                  messageDestination.getName());
+            }
+            else
+            {
+                replyToBindingUrl = String.format("%s:////?routingkey='%s'",
+                                                  ExchangeDefaults.DIRECT_EXCHANGE_CLASS,
+                                                  destinationAddress.getRoutingKey());
+            }
+
+            try
+            {
+                return AMQShortString.valueOf(replyToBindingUrl);
+            }
+            catch (IllegalArgumentException e)
+            {
+                throw new MessageConversionException(
+                        "Could not convert message from 1.0 to 0-8 because conversion of 'reply-to' failed.",
+                        e);
+            }
         }
+        return null;
     }
 
     private AMQShortString getCorrelationIdAsShortString(final Message_1_0 serverMsg)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org