You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/07/25 15:53:04 UTC

qpid-broker-j git commit: QPID-7857: [Java Broker] restore old routing behaviour by separating the to address from the routing address on ServerMessage

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master e62586d18 -> 7da6ca8f2


QPID-7857: [Java Broker] restore old routing behaviour by separating the to address from the routing address on ServerMessage


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/7da6ca8f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/7da6ca8f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/7da6ca8f

Branch: refs/heads/master
Commit: 7da6ca8f24ac983be0ffc124d64a6f8cf1904fd5
Parents: e62586d
Author: Lorenz Quack <lq...@apache.org>
Authored: Tue Jul 25 13:07:57 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Tue Jul 25 16:52:51 2017 +0100

----------------------------------------------------------------------
 .../apache/qpid/server/message/MessageInfo.java |   2 +
 .../qpid/server/message/MessageInfoImpl.java    |   9 +
 .../qpid/server/message/ServerMessage.java      |   2 +
 .../message/internal/InternalMessage.java       |  39 ++--
 .../internal/InternalMessageMetaDataType.java   |   2 +-
 .../server/virtualhost/AbstractVirtualHost.java |   2 +-
 .../message/AbstractServerMessageTest.java      |   6 +
 .../server/store/TestMessageMetaDataType.java   |   6 +
 .../qpid/server/txn/MockServerMessage.java      |   6 +
 .../MessageConverter_v0_10_to_Internal.java     |   2 +-
 .../protocol/v0_10/MessageMetaData_0_10.java    |   5 +
 .../protocol/v0_10/MessageTransferMessage.java  |   6 +
 .../qpid/server/protocol/v0_8/AMQChannel.java   |  11 +-
 .../qpid/server/protocol/v0_8/AMQMessage.java   |   6 +
 .../v0_8/MessageConverter_v0_8_to_Internal.java |   2 +-
 .../v1_0/AnonymousRelayDestination.java         |   2 +-
 .../v1_0/MessageConverter_v1_0_to_Internal.java |   2 +-
 .../qpid/server/protocol/v1_0/Message_1_0.java  |  16 +-
 .../protocol/v1_0/NodeReceivingDestination.java |  18 +-
 .../protocol/v1_0/ReceivingDestination.java     |  10 -
 .../server/management/amqp/ManagementNode.java  |   2 +-
 .../apache/qpid/systest/MessageRoutingTest.java | 186 +++++++++++++++++++
 test-profiles/CPPExcludes                       |   3 +
 test-profiles/Java010Excludes                   |   5 +-
 test-profiles/JavaPre010Excludes                |   3 +
 25 files changed, 304 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfo.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfo.java b/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfo.java
index 90f955f..0ad3df0 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfo.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfo.java
@@ -50,4 +50,6 @@ public interface MessageInfo
     String getUserId();
     Map<String,Object> getHeaders();
     Date getNotValidBefore();
+    String getInitialRoutingAddress();
+    String getTo();
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java b/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java
index 60a820c..7626dc6 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java
@@ -48,6 +48,7 @@ public class MessageInfoImpl implements MessageInfo
     private final long _id;
     private final Map<String, Object> _headers;
     private final String _initialRoutingAddress;
+    private final String _to;
     private final Date _notValidBefore;
     private final String _messageType;
 
@@ -85,6 +86,7 @@ public class MessageInfoImpl implements MessageInfo
         _headerSize = message.getSizeIncludingHeader() - message.getSize();
         _id = message.getMessageNumber();
         _initialRoutingAddress = message.getInitialRoutingAddress();
+        _to = message.getTo();
         _notValidBefore = messageHeader.getNotValidBefore() == 0L ? null : new Date(messageHeader.getNotValidBefore());
 
         if(includeHeaders)
@@ -235,8 +237,15 @@ public class MessageInfoImpl implements MessageInfo
         return _notValidBefore == null ? null : new Date(_notValidBefore.getTime());
     }
 
+    @Override
     public String getInitialRoutingAddress()
     {
         return _initialRoutingAddress;
     }
+
+    @Override
+    public String getTo()
+    {
+        return _to;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java b/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
index dcd307d..37df060 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
@@ -30,6 +30,8 @@ public interface ServerMessage<T extends StorableMessageMetaData> extends Enqueu
 
     String getInitialRoutingAddress();
 
+    String getTo();
+
     AMQMessageHeader getMessageHeader();
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
index 78e930d..102bf09 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
@@ -49,18 +49,22 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
     private final Object _messageBody;
     private InternalMessageHeader _header;
     private String _initialRoutingAddress = "";
+    private final String _destinationName;
 
 
     InternalMessage(final StoredMessage<InternalMessageMetaData> handle,
-                final InternalMessageHeader header,
-                final Object messageBody)
+                    final InternalMessageHeader header,
+                    final Object messageBody,
+                    final String destinationName)
     {
         super(handle, null);
         _header = header;
         _messageBody = messageBody;
+        _destinationName = destinationName;
     }
 
-    InternalMessage(final StoredMessage<InternalMessageMetaData> msg)
+    InternalMessage(final StoredMessage<InternalMessageMetaData> msg,
+                    final String destinationName)
     {
         super(msg, null);
         _header = msg.getMetaData().getHeader();
@@ -80,6 +84,7 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
             throw new ConnectionScopedRuntimeException("Object message contained an object which could not " +
                                                        "be deserialized", e);
         }
+        _destinationName = destinationName;
     }
 
     @Override
@@ -89,6 +94,12 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
     }
 
     @Override
+    public String getTo()
+    {
+        return _destinationName;
+    }
+
+    @Override
     public InternalMessageHeader getMessageHeader()
     {
         return _header;
@@ -125,7 +136,9 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
 
     public static InternalMessage createMessage(final MessageStore store,
                                                 final AMQMessageHeader header,
-                                                final Serializable bodyObject, final boolean persistent)
+                                                final Serializable bodyObject,
+                                                final boolean persistent,
+                                                final String destinationName)
     {
         InternalMessageHeader internalHeader;
         if(header instanceof InternalMessageHeader)
@@ -148,7 +161,7 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
             MessageHandle<InternalMessageMetaData> handle = store.addMessage(metaData);
             handle.addContent(QpidByteBuffer.wrap(bytes));
             StoredMessage<InternalMessageMetaData> storedMessage = handle.allContentAdded();
-            return new InternalMessage(storedMessage, internalHeader, bodyObject);
+            return new InternalMessage(storedMessage, internalHeader, bodyObject, destinationName);
         }
         catch (IOException e)
         {
@@ -164,17 +177,17 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
 
     public static InternalMessage createStringMessage(MessageStore store, AMQMessageHeader header, String messageBody, boolean persistent)
     {
-        return createMessage(store, header, messageBody, persistent);
+        return createMessage(store, header, messageBody, persistent, null);
     }
 
     public static InternalMessage createMapMessage(MessageStore store, AMQMessageHeader header, Map<? extends Object,? extends Object> messageBody)
     {
-        return createMessage(store, header, new LinkedHashMap<Object,Object>(messageBody), false);
+        return createMessage(store, header, new LinkedHashMap<Object,Object>(messageBody), false, null);
     }
 
     public static InternalMessage createListMessage(MessageStore store, AMQMessageHeader header, List<? extends Object> messageBody)
     {
-        return createMessage(store, header, new ArrayList<Object>(messageBody), false);
+        return createMessage(store, header, new ArrayList<Object>(messageBody), false, null);
     }
 
     public static InternalMessage createBytesMessage(MessageStore store, AMQMessageHeader header, byte[] messageBody)
@@ -185,14 +198,18 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
 
     public static InternalMessage createBytesMessage(MessageStore store, AMQMessageHeader header, byte[] messageBody, boolean persist)
     {
-        return createMessage(store, header, messageBody, persist);
+        return createMessage(store, header, messageBody, persist, null);
     }
 
-    public static InternalMessage convert(long messageNumber, boolean persistent, AMQMessageHeader header, Object messageBody)
+    public static InternalMessage convert(long messageNumber,
+                                          boolean persistent,
+                                          AMQMessageHeader header,
+                                          Object messageBody,
+                                          final String destinationName)
     {
         InternalMessageHeader convertedHeader = new InternalMessageHeader(header);
         StoredMessage<InternalMessageMetaData> handle = createReadOnlyHandle(messageNumber, persistent, convertedHeader, messageBody);
-        return new InternalMessage(handle, convertedHeader, messageBody);
+        return new InternalMessage(handle, convertedHeader, messageBody, destinationName);
     }
 
     private static StoredMessage<InternalMessageMetaData> createReadOnlyHandle(final long messageNumber,

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaDataType.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaDataType.java b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaDataType.java
index 507837c..f174497 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaDataType.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaDataType.java
@@ -67,7 +67,7 @@ public class InternalMessageMetaDataType implements MessageMetaDataType<Internal
     @Override
     public ServerMessage<InternalMessageMetaData> createMessage(final StoredMessage<InternalMessageMetaData> msg)
     {
-        return new InternalMessage(msg);
+        return new InternalMessage(msg, null);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 0146ffd..da32b3d 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -798,7 +798,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
             }
         }
 
-        InternalMessage internalMessage = InternalMessage.createMessage(getMessageStore(), header, body, message.isPersistent());
+        InternalMessage internalMessage = InternalMessage.createMessage(getMessageStore(), header, body, message.isPersistent(), address);
         AutoCommitTransaction txn = new AutoCommitTransaction(getMessageStore());
         final InstanceProperties instanceProperties =
                 new InstanceProperties()

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java b/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java
index 1b2cb8f..5a8dc84 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java
@@ -48,6 +48,12 @@ public class AbstractServerMessageTest extends QpidTestCase
         }
 
         @Override
+        public String getTo()
+        {
+            return null;
+        }
+
+        @Override
         public AMQMessageHeader getMessageHeader()
         {
             return null;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
index 52d6e0f..50d9428 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
@@ -157,6 +157,12 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM
         }
 
         @Override
+        public String getTo()
+        {
+            return null;
+        }
+
+        @Override
         public long getSize()
         {
             return 0;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java b/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
index fcb7c92..da61a64 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
@@ -97,6 +97,12 @@ class MockServerMessage implements ServerMessage
     }
 
     @Override
+    public String getTo()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
     public AMQMessageHeader getMessageHeader()
     {
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
index bb98133..8d0d9e6 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
@@ -77,7 +77,7 @@ public class MessageConverter_v0_10_to_Internal implements MessageConverter<Mess
         Object body = convertMessageBody(mimeType, data);
         MessageProperties messageProps = serverMessage.getHeader().getMessageProperties();
         AMQMessageHeader fixedHeader = new DelegatingMessageHeader(serverMessage.getMessageHeader(), messageProps == null ? null : messageProps.getReplyTo(), encoding);
-        return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(), fixedHeader, body);
+        return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(), fixedHeader, body, serverMessage.getTo());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
index e1d790b..de1aad1 100755
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
@@ -216,6 +216,11 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData
         return _deliveryProps == null ? null : _deliveryProps.getRoutingKey();
     }
 
+    public String getExchange()
+    {
+        return _deliveryProps == null ? null : _deliveryProps.getExchange();
+    }
+
     public AMQMessageHeader getMessageHeader()
     {
         return _messageHeader;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
index 1afa7ac..e6b2c8d 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
@@ -56,6 +56,12 @@ public class MessageTransferMessage extends AbstractServerMessageImpl<MessageTra
     }
 
     @Override
+    public String getTo()
+    {
+        return getMetaData().getExchange();
+    }
+
+    @Override
     public AMQMessageHeader getMessageHeader()
     {
         return getMetaData().getMessageHeader();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 7f4bb2e..b3a3266 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -423,7 +423,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
                     }
                     final StoredMessage<MessageMetaData> storedMessage = handle.allContentAdded();
 
-                    final AMQMessage amqMessage = createAMQMessage(storedMessage);
+                    final AMQMessage amqMessage = new AMQMessage(storedMessage, _connection.getReference());
                     try (MessageReference reference = amqMessage.newReference())
                     {
 
@@ -1230,15 +1230,6 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
         return _clientDeliveryMethod;
     }
 
-
-    private AMQMessage createAMQMessage(StoredMessage<MessageMetaData> handle)
-    {
-
-        AMQMessage message = new AMQMessage(handle, _connection.getReference());
-
-        return message;
-    }
-
     public Subject getSubject()
     {
         return _subject;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
index 7dd94e2..faab357 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
@@ -74,6 +74,12 @@ public class AMQMessage extends AbstractServerMessageImpl<AMQMessage, MessageMet
     }
 
     @Override
+    public String getTo()
+    {
+        return AMQShortString.toString(getMessagePublishInfo().getExchange());
+    }
+
+    @Override
     public AMQMessageHeader getMessageHeader()
     {
         return getMessageMetaData().getMessageHeader();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
index 081ddcf..d16a239 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
@@ -77,7 +77,7 @@ public class MessageConverter_v0_8_to_Internal implements MessageConverter<AMQMe
         Object body = convertMessageBody(mimeType, data);
 
         return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(),
-                new DelegatingMessageHeader(serverMessage.getMessageHeader(), encoding), body);
+                new DelegatingMessageHeader(serverMessage.getMessageHeader(), encoding), body, serverMessage.getTo());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
index 4ae80f3..fd53dc8 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
@@ -75,7 +75,7 @@ public class AnonymousRelayDestination implements ReceivingDestination
     public Outcome send(final ServerMessage<?> message, final ServerTransaction txn, final SecurityToken securityToken)
     {
         final ReceivingDestination destination;
-        final String routingAddress = message.getInitialRoutingAddress();
+        final String routingAddress = message.getTo();
         if (!routingAddress.startsWith("/") && routingAddress.contains("/"))
         {
             String[] parts = routingAddress.split("/", 2);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
index a11f5ed..6cecd57 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
@@ -56,7 +56,7 @@ public class MessageConverter_v1_0_to_Internal implements MessageConverter<Messa
     {
         Object bodyObject = MessageConverter_from_1_0.convertBodyToObject(serverMessage);
 
-        return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(), serverMessage.getMessageHeader(), bodyObject);
+        return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(), serverMessage.getMessageHeader(), bodyObject, serverMessage.getTo());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
index 2cd18ab..25e7666 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
@@ -74,13 +74,8 @@ public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageM
     public String getInitialRoutingAddress()
     {
         MessageMetaData_1_0.MessageHeader_1_0 messageHeader = getMessageHeader();
-        String routingAddress = null;
-        final String to = messageHeader.getTo();
-        if (to != null)
-        {
-            routingAddress = to;
-        }
-        else if (messageHeader.getHeader("routing-key") instanceof String)
+        final String routingAddress;
+        if (messageHeader.getHeader("routing-key") instanceof String)
         {
             routingAddress = (String) messageHeader.getHeader("routing-key");
         }
@@ -100,6 +95,13 @@ public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageM
         return routingAddress;
     }
 
+    @Override
+    public String getTo()
+    {
+        MessageMetaData_1_0.MessageHeader_1_0 messageHeader = getMessageHeader();
+        return messageHeader.getTo();
+    }
+
     private MessageMetaData_1_0 getMessageMetaData()
     {
         MessageMetaData_1_0 metaData = getStoredMessage().getMetaData();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
index ae161aa..7bb9044 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
@@ -84,9 +84,7 @@ public class NodeReceivingDestination implements ReceivingDestination
     @Override
     public Outcome send(final ServerMessage<?> message, final ServerTransaction txn, final SecurityToken securityToken)
     {
-        final String routingAddress = _routingAddress == null
-                ? ReceivingDestination.getRoutingAddress(message, _address)
-                : _routingAddress;
+        final String routingAddress = _routingAddress == null ? getRoutingAddress(message) : _routingAddress;
         _destination.authorisePublish(securityToken, Collections.singletonMap("routingKey", routingAddress));
 
         final InstanceProperties instanceProperties =
@@ -150,6 +148,20 @@ public class NodeReceivingDestination implements ReceivingDestination
         return ACCEPTED;
     }
 
+    private String getRoutingAddress(final ServerMessage<?> message)
+    {
+        String initialRoutingAddress = message.getInitialRoutingAddress();
+        if (initialRoutingAddress == null || "".equals(initialRoutingAddress))
+        {
+            initialRoutingAddress = message.getTo() == null ? "" : message.getTo();
+        }
+        if (_address != null && initialRoutingAddress.startsWith(_address + "/"))
+        {
+            initialRoutingAddress = initialRoutingAddress.substring(_address.length() + 1);
+        }
+        return initialRoutingAddress;
+    }
+
     private Outcome createdRejectedOutcome(AmqpError errorCode, String errorMessage)
     {
         Rejected rejected = new Rejected();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
index fc82f49..a6a910f 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
@@ -44,14 +44,4 @@ public interface ReceivingDestination
     String getAddress();
 
     MessageDestination getMessageDestination();
-
-    static String getRoutingAddress(final ServerMessage<?> message, final String destinationName)
-    {
-        String initialRoutingAddress = message.getInitialRoutingAddress();
-        if (destinationName != null && initialRoutingAddress.startsWith(destinationName + "/"))
-        {
-            initialRoutingAddress = initialRoutingAddress.substring(destinationName.length() + 1);
-        }
-        return initialRoutingAddress;
-    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index 7650546..e72ca8b 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -900,7 +900,7 @@ class ManagementNode implements MessageSource, MessageDestination, BaseQueue
         else if (convertedValue instanceof Serializable)
         {
             return InternalMessage.createMessage(_addressSpace.getMessageStore(), responseHeader,
-                                                 (Serializable) convertedValue, false);
+                                                 (Serializable) convertedValue, false, null);
         }
         else
         {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/systests/src/test/java/org/apache/qpid/systest/MessageRoutingTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/systest/MessageRoutingTest.java b/systests/src/test/java/org/apache/qpid/systest/MessageRoutingTest.java
new file mode 100644
index 0000000..d8de19b
--- /dev/null
+++ b/systests/src/test/java/org/apache/qpid/systest/MessageRoutingTest.java
@@ -0,0 +1,186 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systest;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class MessageRoutingTest extends QpidBrokerTestCase
+{
+    private static final String EXCHANGE_NAME = "testExchange";
+    private static final String QUEUE_NAME = "testQueue";
+    private static final String ROUTING_KEY = "testRoute";
+    private Connection _connection;
+    private Session _session;
+
+    @Override
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        _connection = getConnectionBuilder().build();
+        _connection.start();
+        _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        createEntityUsingAmqpManagement(EXCHANGE_NAME, _session, "org.apache.qpid.DirectExchange",
+                                        Collections.singletonMap(Exchange.UNROUTABLE_MESSAGE_BEHAVIOUR, "REJECT"));
+        createEntityUsingAmqpManagement(QUEUE_NAME, _session, "org.apache.qpid.Queue");
+
+        final Map<String, Object> arguments = new HashMap<>();
+        arguments.put("destination", QUEUE_NAME);
+        arguments.put("bindingKey", ROUTING_KEY);
+        performOperationUsingAmqpManagement(EXCHANGE_NAME, "bind", _session, "org.apache.qpid.Exchange",
+                                            arguments);
+    }
+
+    public void testRoutingWithSubjectSetAsJMSMessageType() throws Exception
+    {
+        Destination sendingDestination = _session.createTopic(EXCHANGE_NAME);
+        Destination receivingDestination = _session.createQueue(QUEUE_NAME);
+
+        Message message = _session.createTextMessage("test");
+        message.setJMSType(ROUTING_KEY);
+
+        MessageProducer messageProducer = _session.createProducer(sendingDestination);
+        messageProducer.send(message);
+
+        MessageConsumer messageConsumer = _session.createConsumer(receivingDestination);
+        Message receivedMessage = messageConsumer.receive(getReceiveTimeout());
+
+        assertNotNull("Message not received", receivedMessage);
+        assertEquals("test", ((TextMessage)message).getText());
+    }
+
+    public void testAnonymousRelayRoutingWithSubjectSetAsJMSMessageType() throws Exception
+    {
+        Destination sendingDestination = _session.createTopic(EXCHANGE_NAME);
+        Destination receivingDestination = _session.createQueue(QUEUE_NAME);
+
+        Message message = _session.createTextMessage("test");
+        message.setJMSType(ROUTING_KEY);
+
+        MessageProducer messageProducer = _session.createProducer(null);
+        messageProducer.send(sendingDestination, message);
+
+        MessageConsumer messageConsumer = _session.createConsumer(receivingDestination);
+        Message receivedMessage = messageConsumer.receive(getReceiveTimeout());
+
+        assertNotNull("Message not received", receivedMessage);
+        assertEquals("test", ((TextMessage)message).getText());
+    }
+
+    public void testRoutingWithRoutingKeySetAsJMSProperty() throws Exception
+    {
+        Destination sendingDestination = _session.createTopic(EXCHANGE_NAME);
+        Destination receivingDestination = _session.createQueue(QUEUE_NAME);
+
+        Message message = _session.createTextMessage("test");
+        message.setStringProperty("routing_key", ROUTING_KEY);
+
+        MessageProducer messageProducer = _session.createProducer(sendingDestination);
+        messageProducer.send(message);
+
+        MessageConsumer messageConsumer = _session.createConsumer(receivingDestination);
+        Message receivedMessage = messageConsumer.receive(getReceiveTimeout());
+
+        assertNotNull("Message not received", receivedMessage);
+        assertEquals("test", ((TextMessage)message).getText());
+    }
+
+    public void testRoutingWithExchangeAndRoutingKeyDestination() throws Exception
+    {
+        Destination sendingDestination = _session.createTopic(EXCHANGE_NAME + "/" + ROUTING_KEY);
+        Destination receivingDestination = _session.createQueue(QUEUE_NAME);
+
+        Message message = _session.createTextMessage("test");
+
+        MessageProducer messageProducer = _session.createProducer(sendingDestination);
+        messageProducer.send(message);
+
+        MessageConsumer messageConsumer = _session.createConsumer(receivingDestination);
+        Message receivedMessage = messageConsumer.receive(getReceiveTimeout());
+
+        assertNotNull("Message not received", receivedMessage);
+        assertEquals("test", ((TextMessage)message).getText());
+    }
+
+    public void testAnonymousRelayRoutingWithExchangeAndRoutingKeyDestination() throws Exception
+    {
+        Destination sendingDestination = _session.createTopic(EXCHANGE_NAME + "/" + ROUTING_KEY);
+        Destination receivingDestination = _session.createQueue(QUEUE_NAME);
+
+        Message message = _session.createTextMessage("test");
+
+        MessageProducer messageProducer = _session.createProducer(null);
+        messageProducer.send(sendingDestination, message);
+
+        MessageConsumer messageConsumer = _session.createConsumer(receivingDestination);
+        Message receivedMessage = messageConsumer.receive(getReceiveTimeout());
+
+        assertNotNull("Message not received", receivedMessage);
+        assertEquals("test", ((TextMessage)message).getText());
+    }
+
+    public void testRoutingToQueue() throws Exception
+    {
+        Destination sendingDestination = _session.createQueue(QUEUE_NAME);
+        Destination receivingDestination = _session.createQueue(QUEUE_NAME);
+
+        Message message = _session.createTextMessage("test");
+
+        MessageProducer messageProducer = _session.createProducer(sendingDestination);
+        messageProducer.send(message);
+
+        MessageConsumer messageConsumer = _session.createConsumer(receivingDestination);
+        Message receivedMessage = messageConsumer.receive(getReceiveTimeout());
+
+        assertNotNull("Message not received", receivedMessage);
+        assertEquals("test", ((TextMessage)message).getText());
+    }
+
+    public void testAnonymousRelayRoutingToQueue() throws Exception
+    {
+        Destination sendingDestination = _session.createQueue(QUEUE_NAME);
+        Destination receivingDestination = _session.createQueue(QUEUE_NAME);
+
+        Message message = _session.createTextMessage("test");
+
+        MessageProducer messageProducer = _session.createProducer(null);
+        messageProducer.send(sendingDestination, message);
+
+        MessageConsumer messageConsumer = _session.createConsumer(receivingDestination);
+        Message receivedMessage = messageConsumer.receive(getReceiveTimeout());
+
+        assertNotNull("Message not received", receivedMessage);
+        assertEquals("test", ((TextMessage)message).getText());
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/test-profiles/CPPExcludes
----------------------------------------------------------------------
diff --git a/test-profiles/CPPExcludes b/test-profiles/CPPExcludes
index 9f6a0df..82070f8 100755
--- a/test-profiles/CPPExcludes
+++ b/test-profiles/CPPExcludes
@@ -254,3 +254,6 @@ org.apache.qpid.systest.AnonymousProducerTest#testSyncPublishIntoNonExistingQueu
 
 // Tests require AMQP management
 org.apache.qpid.server.routing.AlternateBindingRoutingTest#*
+
+// Tests AMQP 1.0 specific routing semantics
+org.apache.qpid.systest.MessageRoutingTest#*

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/test-profiles/Java010Excludes
----------------------------------------------------------------------
diff --git a/test-profiles/Java010Excludes b/test-profiles/Java010Excludes
index 19463d6..283f86d 100755
--- a/test-profiles/Java010Excludes
+++ b/test-profiles/Java010Excludes
@@ -90,4 +90,7 @@ org.apache.qpid.tests.protocol.v1_0.*
 
 // QPID-7818: Messages published into non existing queue are discarded on 0-10 path
 org.apache.qpid.systest.AnonymousProducerTest#testPublishIntoNonExistingQueue
-org.apache.qpid.systest.AnonymousProducerTest#testSyncPublishIntoNonExistingQueue
\ No newline at end of file
+org.apache.qpid.systest.AnonymousProducerTest#testSyncPublishIntoNonExistingQueue
+
+// Tests AMQP 1.0 specific routing semantics
+org.apache.qpid.systest.MessageRoutingTest#*

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/test-profiles/JavaPre010Excludes
----------------------------------------------------------------------
diff --git a/test-profiles/JavaPre010Excludes b/test-profiles/JavaPre010Excludes
index 334d67d..5f0f8f9 100644
--- a/test-profiles/JavaPre010Excludes
+++ b/test-profiles/JavaPre010Excludes
@@ -77,3 +77,6 @@ org.apache.qpid.systests.jms_2_0.*
 
 // Exclude 1.0 protocol tests
 org.apache.qpid.tests.protocol.v1_0.*
+
+// Tests AMQP 1.0 specific routing semantics
+org.apache.qpid.systest.MessageRoutingTest#*


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org