You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/05/08 22:55:49 UTC
[1/2] activemq-artemis git commit: ARTEMIS-1834 don't alter STOMP
'destination' header when using prefix
Repository: activemq-artemis
Updated Branches:
refs/heads/master a66b7dda2 -> 845f03dce
ARTEMIS-1834 don't alter STOMP 'destination' header when using prefix
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/90a604da
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/90a604da
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/90a604da
Branch: refs/heads/master
Commit: 90a604da202196fd4556aea30a889d2c7fe65ccd
Parents: a66b7dd
Author: Justin Bertram <jb...@apache.org>
Authored: Tue May 8 15:24:10 2018 -0500
Committer: Justin Bertram <jb...@apache.org>
Committed: Tue May 8 15:28:46 2018 -0500
----------------------------------------------------------------------
.../org/apache/activemq/artemis/api/core/Message.java | 6 ++++++
.../org/apache/activemq/artemis/utils/PrefixUtil.java | 13 +++++++++++++
.../artemis/core/protocol/stomp/StompUtils.java | 10 ++++++++--
.../protocol/stomp/VersionedStompFrameHandler.java | 11 ++++++++++-
.../activemq/artemis/core/server/ServerSession.java | 8 ++++++++
.../artemis/core/server/impl/ServerSessionImpl.java | 8 ++++++++
.../artemis/tests/integration/stomp/StompTest.java | 4 ++++
7 files changed, 57 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/90a604da/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 031c426..6ca37ea 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -153,6 +153,12 @@ public interface Message {
*/
SimpleString HDR_ROUTING_TYPE = new SimpleString("_AMQ_ROUTING_TYPE");
+ /**
+ * The prefix used (if any) when sending this message. For protocols (e.g. STOMP) that need to track this and restore
+ * the prefix when the message is consumed.
+ */
+ SimpleString HDR_PREFIX = new SimpleString("_AMQ_PREFIX");
+
byte DEFAULT_TYPE = 0;
byte OBJECT_TYPE = 2;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/90a604da/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java
index 9c6e92a..4066986 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java
@@ -45,7 +45,20 @@ public class PrefixUtil {
return address;
}
+ public static SimpleString getPrefix(SimpleString address, Map<SimpleString, RoutingType> prefixes) {
+ for (Map.Entry<SimpleString, RoutingType> entry : prefixes.entrySet()) {
+ if (address.startsWith(entry.getKey())) {
+ return removeAddress(address, entry.getKey());
+ }
+ }
+ return null;
+ }
+
public static SimpleString removePrefix(SimpleString string, SimpleString prefix) {
return string.subSeq(prefix.length(), string.length());
}
+
+ public static SimpleString removeAddress(SimpleString string, SimpleString prefix) {
+ return string.subSeq(0, prefix.length());
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/90a604da/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java
index cd17982..07dcd8f 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java
@@ -38,7 +38,7 @@ public class StompUtils {
// Static --------------------------------------------------------
- public static void copyStandardHeadersFromFrameToMessage(StompFrame frame, Message msg) throws Exception {
+ public static void copyStandardHeadersFromFrameToMessage(StompFrame frame, Message msg, String prefix) throws Exception {
Map<String, String> headers = new HashMap<>(frame.getHeadersMap());
String priority = headers.remove(Stomp.Headers.Send.PRIORITY);
@@ -89,6 +89,10 @@ public class StompUtils {
}
}
+ if (prefix != null) {
+ msg.putStringProperty(Message.HDR_PREFIX, prefix);
+ }
+
// now the general headers
for (Entry<String, String> entry : headers.entrySet()) {
String name = entry.getKey();
@@ -101,7 +105,8 @@ public class StompUtils {
StompFrame command,
int deliveryCount) throws Exception {
command.addHeader(Stomp.Headers.Message.MESSAGE_ID, String.valueOf(message.getMessageID()));
- command.addHeader(Stomp.Headers.Message.DESTINATION, message.getAddress().toString());
+ SimpleString prefix = message.getSimpleStringProperty(Message.HDR_PREFIX);
+ command.addHeader(Stomp.Headers.Message.DESTINATION, (prefix == null ? "" : prefix) + message.getAddress());
if (message.getObjectProperty(MessageUtil.CORRELATIONID_HEADER_NAME) != null) {
command.addHeader(Stomp.Headers.Message.CORRELATION_ID, message.getObjectProperty(MessageUtil.CORRELATIONID_HEADER_NAME).toString());
@@ -135,6 +140,7 @@ public class StompUtils {
name.equals(Message.HDR_CONTENT_TYPE) ||
name.equals(Message.HDR_VALIDATED_USER) ||
name.equals(Message.HDR_ROUTING_TYPE) ||
+ name.equals(Message.HDR_PREFIX) ||
name.equals(MessageUtil.TYPE_HEADER_NAME) ||
name.equals(MessageUtil.CORRELATIONID_HEADER_NAME) ||
name.toString().equals(Stomp.Headers.Message.DESTINATION)) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/90a604da/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
index 941cee6..023d885 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
@@ -200,7 +200,7 @@ public abstract class VersionedStompFrameHandler {
}
message.setTimestamp(timestamp);
message.setAddress(SimpleString.toSimpleString(destination));
- StompUtils.copyStandardHeadersFromFrameToMessage(frame, message);
+ StompUtils.copyStandardHeadersFromFrameToMessage(frame, message, getPrefix(frame));
if (frame.hasHeader(Stomp.Headers.CONTENT_LENGTH)) {
message.setType(Message.BYTES_TYPE);
message.getBodyBuffer().writeBytes(frame.getBodyAsBytes());
@@ -291,6 +291,15 @@ public abstract class VersionedStompFrameHandler {
return connection.getSession().getCoreSession().removePrefix(SimpleString.toSimpleString(destination)).toString();
}
+ public String getPrefix(StompFrame request) throws ActiveMQStompException {
+ String destination = request.getHeader(Headers.Send.DESTINATION);
+ if (destination == null) {
+ return null;
+ }
+ SimpleString prefix = connection.getSession().getCoreSession().getPrefix(SimpleString.toSimpleString(destination));
+ return prefix == null ? null : prefix.toString();
+ }
+
public StompFrame postprocess(StompFrame request) {
StompFrame response = null;
if (request.hasHeader(Stomp.Headers.RECEIPT_REQUESTED)) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/90a604da/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index 59a400b..f5cca75 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -316,6 +316,14 @@ public interface ServerSession extends SecurityAuth {
SimpleString removePrefix(SimpleString address);
/**
+ * Get the prefix (if it exists) from the address based on the prefixes provided to the ServerSession constructor.
+ *
+ * @param address the address to inspect
+ * @return the canonical (i.e. non-prefixed) address name
+ */
+ SimpleString getPrefix(SimpleString address);
+
+ /**
* Get the canonical (i.e. non-prefixed) address and the corresponding routing-type.
*
* @param addressInfo the address to inspect
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/90a604da/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index f92b45a..0c6838d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -1835,6 +1835,14 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
@Override
+ public SimpleString getPrefix(SimpleString address) {
+ if (prefixEnabled && address != null) {
+ return PrefixUtil.getPrefix(address, prefixes);
+ }
+ return null;
+ }
+
+ @Override
public AddressInfo getAddressAndRoutingType(AddressInfo addressInfo) {
if (prefixEnabled) {
return addressInfo.getAddressAndRoutingType(prefixes);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/90a604da/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index de6a11d..bc363f2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -1621,6 +1621,7 @@ public class StompTest extends StompTestBase {
Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
Assert.assertEquals("Hello World 2", frame.getBody());
Assert.assertEquals(RoutingType.ANYCAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
+ Assert.assertEquals("/queue/" + ADDRESS, frame.getHeader(Stomp.Headers.Send.DESTINATION));
frame = conn.receiveFrame(1000);
Assert.assertNull(frame);
@@ -1643,6 +1644,7 @@ public class StompTest extends StompTestBase {
Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
Assert.assertEquals("Hello World 3", frame.getBody());
Assert.assertEquals(RoutingType.MULTICAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
+ Assert.assertEquals("/topic/" + ADDRESS, frame.getHeader(Stomp.Headers.Send.DESTINATION));
frame = conn.receiveFrame(1000);
Assert.assertNull(frame);
@@ -1699,6 +1701,7 @@ public class StompTest extends StompTestBase {
Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
Assert.assertEquals("Hello World 2", frame.getBody());
Assert.assertEquals(RoutingType.MULTICAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
+ Assert.assertEquals("/topic/" + ADDRESS, frame.getHeader(Stomp.Headers.Send.DESTINATION));
frame = conn.receiveFrame(1000);
Assert.assertNull(frame);
@@ -1718,6 +1721,7 @@ public class StompTest extends StompTestBase {
Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
Assert.assertEquals("Hello World 1", frame.getBody());
Assert.assertEquals(RoutingType.ANYCAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
+ Assert.assertEquals("/queue/" + ADDRESS, frame.getHeader(Stomp.Headers.Send.DESTINATION));
frame = conn.receiveFrame(2000);
Assert.assertNull(frame);
[2/2] activemq-artemis git commit: This closes #2077
Posted by cl...@apache.org.
This closes #2077
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/845f03dc
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/845f03dc
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/845f03dc
Branch: refs/heads/master
Commit: 845f03dce3eb1f9b99e9ed841368fb0903f9026d
Parents: a66b7dd 90a604d
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue May 8 18:55:41 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue May 8 18:55:41 2018 -0400
----------------------------------------------------------------------
.../org/apache/activemq/artemis/api/core/Message.java | 6 ++++++
.../org/apache/activemq/artemis/utils/PrefixUtil.java | 13 +++++++++++++
.../artemis/core/protocol/stomp/StompUtils.java | 10 ++++++++--
.../protocol/stomp/VersionedStompFrameHandler.java | 11 ++++++++++-
.../activemq/artemis/core/server/ServerSession.java | 8 ++++++++
.../artemis/core/server/impl/ServerSessionImpl.java | 8 ++++++++
.../artemis/tests/integration/stomp/StompTest.java | 4 ++++
7 files changed, 57 insertions(+), 3 deletions(-)
----------------------------------------------------------------------