You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ni...@apache.org on 2021/12/01 09:28:58 UTC
[activemq-artemis] 02/04: ARTEMIS-3577 Save Core msg re-encoding due to msg copy
This is an automated email from the ASF dual-hosted git repository.
nigrofranz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 185236f74d10300028462ca62b70d7d489665d91
Author: franz1981 <ni...@gmail.com>
AuthorDate: Tue Nov 16 10:58:56 2021 +0100
ARTEMIS-3577 Save Core msg re-encoding due to msg copy
---
.../artemis/core/server/impl/QueueImpl.java | 29 +++++++++++++---------
1 file changed, 17 insertions(+), 12 deletions(-)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 13f0140..3c92b1c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2649,7 +2649,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction() {
@Override
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
- return moveBetweenSnFQueues(queueSuffix, tx, ref);
+ return moveBetweenSnFQueues(queueSuffix, tx, ref, null);
}
});
}
@@ -3387,9 +3387,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
final boolean expiry,
final boolean rejectDuplicate,
final long... queueIDs) throws Exception {
- Message copyMessage = makeCopy(ref, expiry);
-
- copyMessage.setAddress(toAddress);
+ Message copyMessage = makeCopy(ref, expiry, toAddress);
Object originalRoutingType = ref.getMessage().getBrokerProperty(Message.HDR_ORIG_ROUTING_TYPE);
if (originalRoutingType != null && originalRoutingType instanceof Byte) {
@@ -3417,8 +3415,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@SuppressWarnings({"ArrayToString", "ArrayToStringConcatenation"})
private boolean moveBetweenSnFQueues(final SimpleString queueSuffix,
final Transaction tx,
- final MessageReference ref) throws Exception {
- Message copyMessage = makeCopy(ref, false, false);
+ final MessageReference ref,
+ final SimpleString newAddress) throws Exception {
+ Message copyMessage = makeCopy(ref, false, false, newAddress);
byte[] oldRouteToIDs = null;
String targetNodeID;
@@ -3521,13 +3520,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return new Pair<>(targetNodeID, targetBinding);
}
- private Message makeCopy(final MessageReference ref, final boolean expiry) throws Exception {
- return makeCopy(ref, expiry, true);
+ private Message makeCopy(final MessageReference ref, final boolean expiry, final SimpleString newAddress) throws Exception {
+ return makeCopy(ref, expiry, true, newAddress);
}
private Message makeCopy(final MessageReference ref,
final boolean expiry,
- final boolean copyOriginalHeaders) throws Exception {
+ final boolean copyOriginalHeaders,
+ final SimpleString newAddress) throws Exception {
if (ref == null) {
ActiveMQServerLogger.LOGGER.nullRefMessage();
throw new ActiveMQNullRefException("Reference to message is null");
@@ -3547,6 +3547,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
Message copy = message.copy(newID, true);
+ if (newAddress != null) {
+ // setting it before checkLargeMessage:
+ // checkLargeMessage can cause msg encoding and setting it later invalidate it,
+ // forcing to be re-encoded later
+ copy.setAddress(newAddress);
+ }
+
if (copyOriginalHeaders) {
copy.referenceOriginalMessage(message, ref.getQueue().getName().toString());
}
@@ -3706,9 +3713,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
tx = new TransactionImpl(storageManager);
}
- Message copyMessage = makeCopy(ref, reason == AckReason.EXPIRED);
-
- copyMessage.setAddress(address);
+ Message copyMessage = makeCopy(ref, reason == AckReason.EXPIRED, address);
RoutingStatus routingStatus = postOffice.route(copyMessage, tx, false, rejectDuplicate, binding);