You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ni...@apache.org on 2021/12/01 09:28:58 UTC

[activemq-artemis] 02/04: ARTEMIS-3577 Save Core msg re-encoding due to msg copy

This is an automated email from the ASF dual-hosted git repository.

nigrofranz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 185236f74d10300028462ca62b70d7d489665d91
Author: franz1981 <ni...@gmail.com>
AuthorDate: Tue Nov 16 10:58:56 2021 +0100

    ARTEMIS-3577 Save Core msg re-encoding due to msg copy
---
 .../artemis/core/server/impl/QueueImpl.java        | 29 +++++++++++++---------
 1 file changed, 17 insertions(+), 12 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 13f0140..3c92b1c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2649,7 +2649,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction() {
          @Override
          public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
-            return moveBetweenSnFQueues(queueSuffix, tx, ref);
+            return moveBetweenSnFQueues(queueSuffix, tx, ref, null);
          }
       });
    }
@@ -3387,9 +3387,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                      final boolean expiry,
                      final boolean rejectDuplicate,
                      final long... queueIDs) throws Exception {
-      Message copyMessage = makeCopy(ref, expiry);
-
-      copyMessage.setAddress(toAddress);
+      Message copyMessage = makeCopy(ref, expiry, toAddress);
 
       Object originalRoutingType = ref.getMessage().getBrokerProperty(Message.HDR_ORIG_ROUTING_TYPE);
       if (originalRoutingType != null && originalRoutingType instanceof Byte) {
@@ -3417,8 +3415,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    @SuppressWarnings({"ArrayToString", "ArrayToStringConcatenation"})
    private boolean moveBetweenSnFQueues(final SimpleString queueSuffix,
                                      final Transaction tx,
-                                     final MessageReference ref) throws Exception {
-      Message copyMessage = makeCopy(ref, false, false);
+                                     final MessageReference ref,
+                                     final SimpleString newAddress) throws Exception {
+      Message copyMessage = makeCopy(ref, false, false, newAddress);
 
       byte[] oldRouteToIDs = null;
       String targetNodeID;
@@ -3521,13 +3520,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       return new Pair<>(targetNodeID, targetBinding);
    }
 
-   private Message makeCopy(final MessageReference ref, final boolean expiry) throws Exception {
-      return makeCopy(ref, expiry, true);
+   private Message makeCopy(final MessageReference ref, final boolean expiry, final SimpleString newAddress) throws Exception {
+      return makeCopy(ref, expiry, true, newAddress);
    }
 
    private Message makeCopy(final MessageReference ref,
                             final boolean expiry,
-                            final boolean copyOriginalHeaders) throws Exception {
+                            final boolean copyOriginalHeaders,
+                            final SimpleString newAddress) throws Exception {
       if (ref == null) {
          ActiveMQServerLogger.LOGGER.nullRefMessage();
          throw new ActiveMQNullRefException("Reference to message is null");
@@ -3547,6 +3547,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
       Message copy = message.copy(newID, true);
 
+      if (newAddress != null) {
+         // setting it before checkLargeMessage:
+         // checkLargeMessage can cause msg encoding and setting it later invalidate it,
+         // forcing to be re-encoded later
+         copy.setAddress(newAddress);
+      }
+
       if (copyOriginalHeaders) {
          copy.referenceOriginalMessage(message, ref.getQueue().getName().toString());
       }
@@ -3706,9 +3713,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          tx = new TransactionImpl(storageManager);
       }
 
-      Message copyMessage = makeCopy(ref, reason == AckReason.EXPIRED);
-
-      copyMessage.setAddress(address);
+      Message copyMessage = makeCopy(ref, reason == AckReason.EXPIRED, address);
 
       RoutingStatus routingStatus = postOffice.route(copyMessage, tx, false, rejectDuplicate, binding);