You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/07/25 15:53:04 UTC
qpid-broker-j git commit: QPID-7857: [Java Broker] restore old
routing behaviour by separating the to address from the routing address on
ServerMessage
Repository: qpid-broker-j
Updated Branches:
refs/heads/master e62586d18 -> 7da6ca8f2
QPID-7857: [Java Broker] restore old routing behaviour by separating the to address from the routing address on ServerMessage
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/7da6ca8f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/7da6ca8f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/7da6ca8f
Branch: refs/heads/master
Commit: 7da6ca8f24ac983be0ffc124d64a6f8cf1904fd5
Parents: e62586d
Author: Lorenz Quack <lq...@apache.org>
Authored: Tue Jul 25 13:07:57 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Tue Jul 25 16:52:51 2017 +0100
----------------------------------------------------------------------
.../apache/qpid/server/message/MessageInfo.java | 2 +
.../qpid/server/message/MessageInfoImpl.java | 9 +
.../qpid/server/message/ServerMessage.java | 2 +
.../message/internal/InternalMessage.java | 39 ++--
.../internal/InternalMessageMetaDataType.java | 2 +-
.../server/virtualhost/AbstractVirtualHost.java | 2 +-
.../message/AbstractServerMessageTest.java | 6 +
.../server/store/TestMessageMetaDataType.java | 6 +
.../qpid/server/txn/MockServerMessage.java | 6 +
.../MessageConverter_v0_10_to_Internal.java | 2 +-
.../protocol/v0_10/MessageMetaData_0_10.java | 5 +
.../protocol/v0_10/MessageTransferMessage.java | 6 +
.../qpid/server/protocol/v0_8/AMQChannel.java | 11 +-
.../qpid/server/protocol/v0_8/AMQMessage.java | 6 +
.../v0_8/MessageConverter_v0_8_to_Internal.java | 2 +-
.../v1_0/AnonymousRelayDestination.java | 2 +-
.../v1_0/MessageConverter_v1_0_to_Internal.java | 2 +-
.../qpid/server/protocol/v1_0/Message_1_0.java | 16 +-
.../protocol/v1_0/NodeReceivingDestination.java | 18 +-
.../protocol/v1_0/ReceivingDestination.java | 10 -
.../server/management/amqp/ManagementNode.java | 2 +-
.../apache/qpid/systest/MessageRoutingTest.java | 186 +++++++++++++++++++
test-profiles/CPPExcludes | 3 +
test-profiles/Java010Excludes | 5 +-
test-profiles/JavaPre010Excludes | 3 +
25 files changed, 304 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfo.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfo.java b/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfo.java
index 90f955f..0ad3df0 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfo.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfo.java
@@ -50,4 +50,6 @@ public interface MessageInfo
String getUserId();
Map<String,Object> getHeaders();
Date getNotValidBefore();
+ String getInitialRoutingAddress();
+ String getTo();
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java b/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java
index 60a820c..7626dc6 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java
@@ -48,6 +48,7 @@ public class MessageInfoImpl implements MessageInfo
private final long _id;
private final Map<String, Object> _headers;
private final String _initialRoutingAddress;
+ private final String _to;
private final Date _notValidBefore;
private final String _messageType;
@@ -85,6 +86,7 @@ public class MessageInfoImpl implements MessageInfo
_headerSize = message.getSizeIncludingHeader() - message.getSize();
_id = message.getMessageNumber();
_initialRoutingAddress = message.getInitialRoutingAddress();
+ _to = message.getTo();
_notValidBefore = messageHeader.getNotValidBefore() == 0L ? null : new Date(messageHeader.getNotValidBefore());
if(includeHeaders)
@@ -235,8 +237,15 @@ public class MessageInfoImpl implements MessageInfo
return _notValidBefore == null ? null : new Date(_notValidBefore.getTime());
}
+ @Override
public String getInitialRoutingAddress()
{
return _initialRoutingAddress;
}
+
+ @Override
+ public String getTo()
+ {
+ return _to;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java b/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
index dcd307d..37df060 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
@@ -30,6 +30,8 @@ public interface ServerMessage<T extends StorableMessageMetaData> extends Enqueu
String getInitialRoutingAddress();
+ String getTo();
+
AMQMessageHeader getMessageHeader();
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
index 78e930d..102bf09 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
@@ -49,18 +49,22 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
private final Object _messageBody;
private InternalMessageHeader _header;
private String _initialRoutingAddress = "";
+ private final String _destinationName;
InternalMessage(final StoredMessage<InternalMessageMetaData> handle,
- final InternalMessageHeader header,
- final Object messageBody)
+ final InternalMessageHeader header,
+ final Object messageBody,
+ final String destinationName)
{
super(handle, null);
_header = header;
_messageBody = messageBody;
+ _destinationName = destinationName;
}
- InternalMessage(final StoredMessage<InternalMessageMetaData> msg)
+ InternalMessage(final StoredMessage<InternalMessageMetaData> msg,
+ final String destinationName)
{
super(msg, null);
_header = msg.getMetaData().getHeader();
@@ -80,6 +84,7 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
throw new ConnectionScopedRuntimeException("Object message contained an object which could not " +
"be deserialized", e);
}
+ _destinationName = destinationName;
}
@Override
@@ -89,6 +94,12 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
}
@Override
+ public String getTo()
+ {
+ return _destinationName;
+ }
+
+ @Override
public InternalMessageHeader getMessageHeader()
{
return _header;
@@ -125,7 +136,9 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
public static InternalMessage createMessage(final MessageStore store,
final AMQMessageHeader header,
- final Serializable bodyObject, final boolean persistent)
+ final Serializable bodyObject,
+ final boolean persistent,
+ final String destinationName)
{
InternalMessageHeader internalHeader;
if(header instanceof InternalMessageHeader)
@@ -148,7 +161,7 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
MessageHandle<InternalMessageMetaData> handle = store.addMessage(metaData);
handle.addContent(QpidByteBuffer.wrap(bytes));
StoredMessage<InternalMessageMetaData> storedMessage = handle.allContentAdded();
- return new InternalMessage(storedMessage, internalHeader, bodyObject);
+ return new InternalMessage(storedMessage, internalHeader, bodyObject, destinationName);
}
catch (IOException e)
{
@@ -164,17 +177,17 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
public static InternalMessage createStringMessage(MessageStore store, AMQMessageHeader header, String messageBody, boolean persistent)
{
- return createMessage(store, header, messageBody, persistent);
+ return createMessage(store, header, messageBody, persistent, null);
}
public static InternalMessage createMapMessage(MessageStore store, AMQMessageHeader header, Map<? extends Object,? extends Object> messageBody)
{
- return createMessage(store, header, new LinkedHashMap<Object,Object>(messageBody), false);
+ return createMessage(store, header, new LinkedHashMap<Object,Object>(messageBody), false, null);
}
public static InternalMessage createListMessage(MessageStore store, AMQMessageHeader header, List<? extends Object> messageBody)
{
- return createMessage(store, header, new ArrayList<Object>(messageBody), false);
+ return createMessage(store, header, new ArrayList<Object>(messageBody), false, null);
}
public static InternalMessage createBytesMessage(MessageStore store, AMQMessageHeader header, byte[] messageBody)
@@ -185,14 +198,18 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
public static InternalMessage createBytesMessage(MessageStore store, AMQMessageHeader header, byte[] messageBody, boolean persist)
{
- return createMessage(store, header, messageBody, persist);
+ return createMessage(store, header, messageBody, persist, null);
}
- public static InternalMessage convert(long messageNumber, boolean persistent, AMQMessageHeader header, Object messageBody)
+ public static InternalMessage convert(long messageNumber,
+ boolean persistent,
+ AMQMessageHeader header,
+ Object messageBody,
+ final String destinationName)
{
InternalMessageHeader convertedHeader = new InternalMessageHeader(header);
StoredMessage<InternalMessageMetaData> handle = createReadOnlyHandle(messageNumber, persistent, convertedHeader, messageBody);
- return new InternalMessage(handle, convertedHeader, messageBody);
+ return new InternalMessage(handle, convertedHeader, messageBody, destinationName);
}
private static StoredMessage<InternalMessageMetaData> createReadOnlyHandle(final long messageNumber,
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaDataType.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaDataType.java b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaDataType.java
index 507837c..f174497 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaDataType.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaDataType.java
@@ -67,7 +67,7 @@ public class InternalMessageMetaDataType implements MessageMetaDataType<Internal
@Override
public ServerMessage<InternalMessageMetaData> createMessage(final StoredMessage<InternalMessageMetaData> msg)
{
- return new InternalMessage(msg);
+ return new InternalMessage(msg, null);
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 0146ffd..da32b3d 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -798,7 +798,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
}
- InternalMessage internalMessage = InternalMessage.createMessage(getMessageStore(), header, body, message.isPersistent());
+ InternalMessage internalMessage = InternalMessage.createMessage(getMessageStore(), header, body, message.isPersistent(), address);
AutoCommitTransaction txn = new AutoCommitTransaction(getMessageStore());
final InstanceProperties instanceProperties =
new InstanceProperties()
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java b/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java
index 1b2cb8f..5a8dc84 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java
@@ -48,6 +48,12 @@ public class AbstractServerMessageTest extends QpidTestCase
}
@Override
+ public String getTo()
+ {
+ return null;
+ }
+
+ @Override
public AMQMessageHeader getMessageHeader()
{
return null;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
index 52d6e0f..50d9428 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
@@ -157,6 +157,12 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM
}
@Override
+ public String getTo()
+ {
+ return null;
+ }
+
+ @Override
public long getSize()
{
return 0;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java b/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
index fcb7c92..da61a64 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
@@ -97,6 +97,12 @@ class MockServerMessage implements ServerMessage
}
@Override
+ public String getTo()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public AMQMessageHeader getMessageHeader()
{
throw new UnsupportedOperationException();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
index bb98133..8d0d9e6 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
@@ -77,7 +77,7 @@ public class MessageConverter_v0_10_to_Internal implements MessageConverter<Mess
Object body = convertMessageBody(mimeType, data);
MessageProperties messageProps = serverMessage.getHeader().getMessageProperties();
AMQMessageHeader fixedHeader = new DelegatingMessageHeader(serverMessage.getMessageHeader(), messageProps == null ? null : messageProps.getReplyTo(), encoding);
- return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(), fixedHeader, body);
+ return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(), fixedHeader, body, serverMessage.getTo());
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
index e1d790b..de1aad1 100755
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
@@ -216,6 +216,11 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData
return _deliveryProps == null ? null : _deliveryProps.getRoutingKey();
}
+ public String getExchange()
+ {
+ return _deliveryProps == null ? null : _deliveryProps.getExchange();
+ }
+
public AMQMessageHeader getMessageHeader()
{
return _messageHeader;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
index 1afa7ac..e6b2c8d 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
@@ -56,6 +56,12 @@ public class MessageTransferMessage extends AbstractServerMessageImpl<MessageTra
}
@Override
+ public String getTo()
+ {
+ return getMetaData().getExchange();
+ }
+
+ @Override
public AMQMessageHeader getMessageHeader()
{
return getMetaData().getMessageHeader();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 7f4bb2e..b3a3266 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -423,7 +423,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
}
final StoredMessage<MessageMetaData> storedMessage = handle.allContentAdded();
- final AMQMessage amqMessage = createAMQMessage(storedMessage);
+ final AMQMessage amqMessage = new AMQMessage(storedMessage, _connection.getReference());
try (MessageReference reference = amqMessage.newReference())
{
@@ -1230,15 +1230,6 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
return _clientDeliveryMethod;
}
-
- private AMQMessage createAMQMessage(StoredMessage<MessageMetaData> handle)
- {
-
- AMQMessage message = new AMQMessage(handle, _connection.getReference());
-
- return message;
- }
-
public Subject getSubject()
{
return _subject;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
index 7dd94e2..faab357 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
@@ -74,6 +74,12 @@ public class AMQMessage extends AbstractServerMessageImpl<AMQMessage, MessageMet
}
@Override
+ public String getTo()
+ {
+ return AMQShortString.toString(getMessagePublishInfo().getExchange());
+ }
+
+ @Override
public AMQMessageHeader getMessageHeader()
{
return getMessageMetaData().getMessageHeader();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
index 081ddcf..d16a239 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
@@ -77,7 +77,7 @@ public class MessageConverter_v0_8_to_Internal implements MessageConverter<AMQMe
Object body = convertMessageBody(mimeType, data);
return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(),
- new DelegatingMessageHeader(serverMessage.getMessageHeader(), encoding), body);
+ new DelegatingMessageHeader(serverMessage.getMessageHeader(), encoding), body, serverMessage.getTo());
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
index 4ae80f3..fd53dc8 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
@@ -75,7 +75,7 @@ public class AnonymousRelayDestination implements ReceivingDestination
public Outcome send(final ServerMessage<?> message, final ServerTransaction txn, final SecurityToken securityToken)
{
final ReceivingDestination destination;
- final String routingAddress = message.getInitialRoutingAddress();
+ final String routingAddress = message.getTo();
if (!routingAddress.startsWith("/") && routingAddress.contains("/"))
{
String[] parts = routingAddress.split("/", 2);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
index a11f5ed..6cecd57 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
@@ -56,7 +56,7 @@ public class MessageConverter_v1_0_to_Internal implements MessageConverter<Messa
{
Object bodyObject = MessageConverter_from_1_0.convertBodyToObject(serverMessage);
- return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(), serverMessage.getMessageHeader(), bodyObject);
+ return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(), serverMessage.getMessageHeader(), bodyObject, serverMessage.getTo());
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
index 2cd18ab..25e7666 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
@@ -74,13 +74,8 @@ public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageM
public String getInitialRoutingAddress()
{
MessageMetaData_1_0.MessageHeader_1_0 messageHeader = getMessageHeader();
- String routingAddress = null;
- final String to = messageHeader.getTo();
- if (to != null)
- {
- routingAddress = to;
- }
- else if (messageHeader.getHeader("routing-key") instanceof String)
+ final String routingAddress;
+ if (messageHeader.getHeader("routing-key") instanceof String)
{
routingAddress = (String) messageHeader.getHeader("routing-key");
}
@@ -100,6 +95,13 @@ public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageM
return routingAddress;
}
+ @Override
+ public String getTo()
+ {
+ MessageMetaData_1_0.MessageHeader_1_0 messageHeader = getMessageHeader();
+ return messageHeader.getTo();
+ }
+
private MessageMetaData_1_0 getMessageMetaData()
{
MessageMetaData_1_0 metaData = getStoredMessage().getMetaData();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
index ae161aa..7bb9044 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
@@ -84,9 +84,7 @@ public class NodeReceivingDestination implements ReceivingDestination
@Override
public Outcome send(final ServerMessage<?> message, final ServerTransaction txn, final SecurityToken securityToken)
{
- final String routingAddress = _routingAddress == null
- ? ReceivingDestination.getRoutingAddress(message, _address)
- : _routingAddress;
+ final String routingAddress = _routingAddress == null ? getRoutingAddress(message) : _routingAddress;
_destination.authorisePublish(securityToken, Collections.singletonMap("routingKey", routingAddress));
final InstanceProperties instanceProperties =
@@ -150,6 +148,20 @@ public class NodeReceivingDestination implements ReceivingDestination
return ACCEPTED;
}
+ private String getRoutingAddress(final ServerMessage<?> message)
+ {
+ String initialRoutingAddress = message.getInitialRoutingAddress();
+ if (initialRoutingAddress == null || "".equals(initialRoutingAddress))
+ {
+ initialRoutingAddress = message.getTo() == null ? "" : message.getTo();
+ }
+ if (_address != null && initialRoutingAddress.startsWith(_address + "/"))
+ {
+ initialRoutingAddress = initialRoutingAddress.substring(_address.length() + 1);
+ }
+ return initialRoutingAddress;
+ }
+
private Outcome createdRejectedOutcome(AmqpError errorCode, String errorMessage)
{
Rejected rejected = new Rejected();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
index fc82f49..a6a910f 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
@@ -44,14 +44,4 @@ public interface ReceivingDestination
String getAddress();
MessageDestination getMessageDestination();
-
- static String getRoutingAddress(final ServerMessage<?> message, final String destinationName)
- {
- String initialRoutingAddress = message.getInitialRoutingAddress();
- if (destinationName != null && initialRoutingAddress.startsWith(destinationName + "/"))
- {
- initialRoutingAddress = initialRoutingAddress.substring(destinationName.length() + 1);
- }
- return initialRoutingAddress;
- }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index 7650546..e72ca8b 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -900,7 +900,7 @@ class ManagementNode implements MessageSource, MessageDestination, BaseQueue
else if (convertedValue instanceof Serializable)
{
return InternalMessage.createMessage(_addressSpace.getMessageStore(), responseHeader,
- (Serializable) convertedValue, false);
+ (Serializable) convertedValue, false, null);
}
else
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/systests/src/test/java/org/apache/qpid/systest/MessageRoutingTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/systest/MessageRoutingTest.java b/systests/src/test/java/org/apache/qpid/systest/MessageRoutingTest.java
new file mode 100644
index 0000000..d8de19b
--- /dev/null
+++ b/systests/src/test/java/org/apache/qpid/systest/MessageRoutingTest.java
@@ -0,0 +1,186 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systest;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class MessageRoutingTest extends QpidBrokerTestCase
+{
+ private static final String EXCHANGE_NAME = "testExchange";
+ private static final String QUEUE_NAME = "testQueue";
+ private static final String ROUTING_KEY = "testRoute";
+ private Connection _connection;
+ private Session _session;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _connection = getConnectionBuilder().build();
+ _connection.start();
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ createEntityUsingAmqpManagement(EXCHANGE_NAME, _session, "org.apache.qpid.DirectExchange",
+ Collections.singletonMap(Exchange.UNROUTABLE_MESSAGE_BEHAVIOUR, "REJECT"));
+ createEntityUsingAmqpManagement(QUEUE_NAME, _session, "org.apache.qpid.Queue");
+
+ final Map<String, Object> arguments = new HashMap<>();
+ arguments.put("destination", QUEUE_NAME);
+ arguments.put("bindingKey", ROUTING_KEY);
+ performOperationUsingAmqpManagement(EXCHANGE_NAME, "bind", _session, "org.apache.qpid.Exchange",
+ arguments);
+ }
+
+ public void testRoutingWithSubjectSetAsJMSMessageType() throws Exception
+ {
+ Destination sendingDestination = _session.createTopic(EXCHANGE_NAME);
+ Destination receivingDestination = _session.createQueue(QUEUE_NAME);
+
+ Message message = _session.createTextMessage("test");
+ message.setJMSType(ROUTING_KEY);
+
+ MessageProducer messageProducer = _session.createProducer(sendingDestination);
+ messageProducer.send(message);
+
+ MessageConsumer messageConsumer = _session.createConsumer(receivingDestination);
+ Message receivedMessage = messageConsumer.receive(getReceiveTimeout());
+
+ assertNotNull("Message not received", receivedMessage);
+ assertEquals("test", ((TextMessage)message).getText());
+ }
+
+ public void testAnonymousRelayRoutingWithSubjectSetAsJMSMessageType() throws Exception
+ {
+ Destination sendingDestination = _session.createTopic(EXCHANGE_NAME);
+ Destination receivingDestination = _session.createQueue(QUEUE_NAME);
+
+ Message message = _session.createTextMessage("test");
+ message.setJMSType(ROUTING_KEY);
+
+ MessageProducer messageProducer = _session.createProducer(null);
+ messageProducer.send(sendingDestination, message);
+
+ MessageConsumer messageConsumer = _session.createConsumer(receivingDestination);
+ Message receivedMessage = messageConsumer.receive(getReceiveTimeout());
+
+ assertNotNull("Message not received", receivedMessage);
+ assertEquals("test", ((TextMessage)message).getText());
+ }
+
+ public void testRoutingWithRoutingKeySetAsJMSProperty() throws Exception
+ {
+ Destination sendingDestination = _session.createTopic(EXCHANGE_NAME);
+ Destination receivingDestination = _session.createQueue(QUEUE_NAME);
+
+ Message message = _session.createTextMessage("test");
+ message.setStringProperty("routing_key", ROUTING_KEY);
+
+ MessageProducer messageProducer = _session.createProducer(sendingDestination);
+ messageProducer.send(message);
+
+ MessageConsumer messageConsumer = _session.createConsumer(receivingDestination);
+ Message receivedMessage = messageConsumer.receive(getReceiveTimeout());
+
+ assertNotNull("Message not received", receivedMessage);
+ assertEquals("test", ((TextMessage)message).getText());
+ }
+
+ public void testRoutingWithExchangeAndRoutingKeyDestination() throws Exception
+ {
+ Destination sendingDestination = _session.createTopic(EXCHANGE_NAME + "/" + ROUTING_KEY);
+ Destination receivingDestination = _session.createQueue(QUEUE_NAME);
+
+ Message message = _session.createTextMessage("test");
+
+ MessageProducer messageProducer = _session.createProducer(sendingDestination);
+ messageProducer.send(message);
+
+ MessageConsumer messageConsumer = _session.createConsumer(receivingDestination);
+ Message receivedMessage = messageConsumer.receive(getReceiveTimeout());
+
+ assertNotNull("Message not received", receivedMessage);
+ assertEquals("test", ((TextMessage)message).getText());
+ }
+
+ public void testAnonymousRelayRoutingWithExchangeAndRoutingKeyDestination() throws Exception
+ {
+ Destination sendingDestination = _session.createTopic(EXCHANGE_NAME + "/" + ROUTING_KEY);
+ Destination receivingDestination = _session.createQueue(QUEUE_NAME);
+
+ Message message = _session.createTextMessage("test");
+
+ MessageProducer messageProducer = _session.createProducer(null);
+ messageProducer.send(sendingDestination, message);
+
+ MessageConsumer messageConsumer = _session.createConsumer(receivingDestination);
+ Message receivedMessage = messageConsumer.receive(getReceiveTimeout());
+
+ assertNotNull("Message not received", receivedMessage);
+ assertEquals("test", ((TextMessage)message).getText());
+ }
+
+ public void testRoutingToQueue() throws Exception
+ {
+ Destination sendingDestination = _session.createQueue(QUEUE_NAME);
+ Destination receivingDestination = _session.createQueue(QUEUE_NAME);
+
+ Message message = _session.createTextMessage("test");
+
+ MessageProducer messageProducer = _session.createProducer(sendingDestination);
+ messageProducer.send(message);
+
+ MessageConsumer messageConsumer = _session.createConsumer(receivingDestination);
+ Message receivedMessage = messageConsumer.receive(getReceiveTimeout());
+
+ assertNotNull("Message not received", receivedMessage);
+ assertEquals("test", ((TextMessage)message).getText());
+ }
+
+ public void testAnonymousRelayRoutingToQueue() throws Exception
+ {
+ Destination sendingDestination = _session.createQueue(QUEUE_NAME);
+ Destination receivingDestination = _session.createQueue(QUEUE_NAME);
+
+ Message message = _session.createTextMessage("test");
+
+ MessageProducer messageProducer = _session.createProducer(null);
+ messageProducer.send(sendingDestination, message);
+
+ MessageConsumer messageConsumer = _session.createConsumer(receivingDestination);
+ Message receivedMessage = messageConsumer.receive(getReceiveTimeout());
+
+ assertNotNull("Message not received", receivedMessage);
+ assertEquals("test", ((TextMessage)message).getText());
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/test-profiles/CPPExcludes
----------------------------------------------------------------------
diff --git a/test-profiles/CPPExcludes b/test-profiles/CPPExcludes
index 9f6a0df..82070f8 100755
--- a/test-profiles/CPPExcludes
+++ b/test-profiles/CPPExcludes
@@ -254,3 +254,6 @@ org.apache.qpid.systest.AnonymousProducerTest#testSyncPublishIntoNonExistingQueu
// Tests require AMQP management
org.apache.qpid.server.routing.AlternateBindingRoutingTest#*
+
+// Tests AMQP 1.0 specific routing semantics
+org.apache.qpid.systest.MessageRoutingTest#*
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/test-profiles/Java010Excludes
----------------------------------------------------------------------
diff --git a/test-profiles/Java010Excludes b/test-profiles/Java010Excludes
index 19463d6..283f86d 100755
--- a/test-profiles/Java010Excludes
+++ b/test-profiles/Java010Excludes
@@ -90,4 +90,7 @@ org.apache.qpid.tests.protocol.v1_0.*
// QPID-7818: Messages published into non existing queue are discarded on 0-10 path
org.apache.qpid.systest.AnonymousProducerTest#testPublishIntoNonExistingQueue
-org.apache.qpid.systest.AnonymousProducerTest#testSyncPublishIntoNonExistingQueue
\ No newline at end of file
+org.apache.qpid.systest.AnonymousProducerTest#testSyncPublishIntoNonExistingQueue
+
+// Tests AMQP 1.0 specific routing semantics
+org.apache.qpid.systest.MessageRoutingTest#*
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7da6ca8f/test-profiles/JavaPre010Excludes
----------------------------------------------------------------------
diff --git a/test-profiles/JavaPre010Excludes b/test-profiles/JavaPre010Excludes
index 334d67d..5f0f8f9 100644
--- a/test-profiles/JavaPre010Excludes
+++ b/test-profiles/JavaPre010Excludes
@@ -77,3 +77,6 @@ org.apache.qpid.systests.jms_2_0.*
// Exclude 1.0 protocol tests
org.apache.qpid.tests.protocol.v1_0.*
+
+// Tests AMQP 1.0 specific routing semantics
+org.apache.qpid.systest.MessageRoutingTest#*
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org