You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/09/15 15:03:18 UTC
[5/5] activemq-artemis git commit: ARTEMIS-734 Message expired while
being moved on the cluster bridge does not follow the address setting
configuration.
ARTEMIS-734 Message expired while being moved on the cluster bridge does not follow the address setting configuration.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2509eeb8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2509eeb8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2509eeb8
Branch: refs/heads/master
Commit: 2509eeb818078627838ee2cd5b828fc98c514dfa
Parents: 2ed51bf
Author: Vlado Pakan <vp...@redhat.com>
Authored: Wed Sep 14 18:43:34 2016 -0400
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Thu Sep 15 16:02:49 2016 +0100
----------------------------------------------------------------------
.../artemis/core/server/impl/QueueImpl.java | 46 ++++++++++++++++++--
1 file changed, 42 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2509eeb8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 1514748..99ce43f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1100,13 +1100,17 @@ public class QueueImpl implements Queue {
}
}
- @Override
public void expire(final MessageReference ref) throws Exception {
- if (expiryAddress != null) {
+ SimpleString messageExpiryAddress = expiryAddressFromMessageAddress(ref);
+ if (messageExpiryAddress == null) {
+ messageExpiryAddress = expiryAddressFromAddressSettings(ref);
+ }
+
+ if (messageExpiryAddress != null) {
if (logger.isTraceEnabled()) {
- logger.trace("moving expired reference " + ref + " to address = " + expiryAddress + " from queue=" + this.getName());
+ logger.trace("moving expired reference " + ref + " to address = " + messageExpiryAddress + " from queue=" + this.getName());
}
- move(null, expiryAddress, ref, false, AckReason.EXPIRED);
+ move(null, messageExpiryAddress, ref, false, AckReason.EXPIRED);
}
else {
if (logger.isTraceEnabled()) {
@@ -1116,6 +1120,40 @@ public class QueueImpl implements Queue {
}
}
+ private SimpleString expiryAddressFromMessageAddress(MessageReference ref) {
+ SimpleString messageAddress = extractAddress(ref.getMessage());
+ SimpleString expiryAddress = null;
+
+ if (messageAddress == null || messageAddress.equals(getAddress())) {
+ expiryAddress = getExpiryAddress();
+ }
+
+ return expiryAddress;
+ }
+
+ private SimpleString expiryAddressFromAddressSettings(MessageReference ref) {
+ SimpleString messageAddress = extractAddress(ref.getMessage());
+ SimpleString expiryAddress = null;
+
+ if (messageAddress != null) {
+ AddressSettings addressSettings = addressSettingsRepository.getMatch(messageAddress.toString());
+
+ expiryAddress = addressSettings.getExpiryAddress();
+ }
+
+ return expiryAddress;
+ }
+
+ private SimpleString extractAddress(ServerMessage message) {
+ if (message.containsProperty(Message.HDR_ORIG_MESSAGE_ID)) {
+ return message.getSimpleStringProperty(Message.HDR_ORIGINAL_ADDRESS);
+ }
+ else {
+ return message.getAddress();
+ }
+ }
+
+
@Override
public SimpleString getExpiryAddress() {
return this.expiryAddress;