You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ni...@apache.org on 2021/12/01 09:28:59 UTC

[activemq-artemis] 03/04: ARTEMIS-3578 Save SimpleString duplication and long[] allocation while moving Core messages

This is an automated email from the ASF dual-hosted git repository.

nigrofranz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 7e6373d4dfa87b1fce690a1387c8eb9edce3af3b
Author: franz1981 <ni...@gmail.com>
AuthorDate: Tue Nov 16 10:11:13 2021 +0100

    ARTEMIS-3578 Save SimpleString duplication and long[] allocation while moving Core messages
---
 .../apache/activemq/artemis/api/core/Message.java  |  9 +++++----
 .../protocol/amqp/broker/AMQPLargeMessage.java     |  3 ++-
 .../protocol/amqp/converter/TestConversions.java   |  4 ++--
 .../impl/journal/LargeServerMessageImpl.java       |  3 ++-
 .../artemis/core/server/impl/DivertImpl.java       |  2 +-
 .../artemis/core/server/impl/QueueImpl.java        | 23 +++++++++-------------
 6 files changed, 21 insertions(+), 23 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 5dd3044..88f303f 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -470,12 +470,13 @@ public interface Message {
       // only valid probably on AMQP
    }
 
-   default void referenceOriginalMessage(final Message original, String originalQueue) {
+   default void referenceOriginalMessage(final Message original, final SimpleString originalQueue) {
       setBrokerProperty(Message.HDR_ORIGINAL_QUEUE, originalQueue);
-      setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, original.getAddress());
+      setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, original.getAddressSimpleString());
       setBrokerProperty(Message.HDR_ORIG_MESSAGE_ID, original.getMessageID());
-      if (original.getRoutingType() != null) {
-         setBrokerProperty(Message.HDR_ORIG_ROUTING_TYPE, original.getRoutingType().getType());
+      final RoutingType routingType = original.getRoutingType();
+      if (routingType != null) {
+         setBrokerProperty(Message.HDR_ORIG_ROUTING_TYPE, routingType.getType());
       }
 
       // reset expiry
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
index 1026eeb..b921574 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
@@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.message.LargeBodyReader;
@@ -643,7 +644,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
    }
 
    @Override
-   public void referenceOriginalMessage(final Message original, String originalQueue) {
+   public void referenceOriginalMessage(final Message original, final SimpleString originalQueue) {
 
       super.referenceOriginalMessage(original, originalQueue);
 
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
index a2fd675..90c2281 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
@@ -499,9 +499,9 @@ public class TestConversions extends Assert {
       for (int i = 0; i < 100; i++) {
          encodedMessage.setMessageID(333L);
          if (i % 3 == 0) {
-            encodedMessage.referenceOriginalMessage(encodedMessage, "SOME-OTHER-QUEUE-DOES-NOT-MATTER-WHAT");
+            encodedMessage.referenceOriginalMessage(encodedMessage, SimpleString.toSimpleString("SOME-OTHER-QUEUE-DOES-NOT-MATTER-WHAT"));
          } else {
-            encodedMessage.referenceOriginalMessage(encodedMessage, "XXX");
+            encodedMessage.referenceOriginalMessage(encodedMessage, SimpleString.toSimpleString("XXX"));
          }
          encodedMessage.putStringProperty("another " + i, "value " + i);
          encodedMessage.messageChanged();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index 6b9e9f1..5b228f2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.message.LargeBodyReader;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
@@ -265,7 +266,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
    }
 
    @Override
-   public void referenceOriginalMessage(final Message original, String originalQueue) {
+   public void referenceOriginalMessage(final Message original, final SimpleString originalQueue) {
 
       super.referenceOriginalMessage(original, originalQueue);
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
index 6831f17..907ea99 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
@@ -105,7 +105,7 @@ public class DivertImpl implements Divert {
          copy = message.copy(id);
 
          // This will set the original MessageId, and the original address
-         copy.referenceOriginalMessage(message, this.getUniqueName().toString());
+         copy.referenceOriginalMessage(message, this.getUniqueName());
 
          copy.setAddress(forwardAddress);
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 3c92b1c..d3da7e9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2709,11 +2709,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                   }
                }
 
-               if (targetQueue != null) {
-                  move(SimpleString.toSimpleString(originalMessageAddress), tx, ref, false, false, targetQueue.longValue());
-               } else {
-                  move(SimpleString.toSimpleString(originalMessageAddress), tx, ref, false, false);
-               }
+               move(SimpleString.toSimpleString(originalMessageAddress), tx, ref, false, false, targetQueue);
 
                return true;
             }
@@ -3386,7 +3382,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                      final MessageReference ref,
                      final boolean expiry,
                      final boolean rejectDuplicate,
-                     final long... queueIDs) throws Exception {
+                     final Long queueID) throws Exception {
       Message copyMessage = makeCopy(ref, expiry, toAddress);
 
       Object originalRoutingType = ref.getMessage().getBrokerProperty(Message.HDR_ORIG_ROUTING_TYPE);
@@ -3394,12 +3390,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          copyMessage.setRoutingType(RoutingType.getType((Byte) originalRoutingType));
       }
 
-      if (queueIDs != null && queueIDs.length > 0) {
-         ByteBuffer buffer = ByteBuffer.allocate(8 * queueIDs.length);
-         for (long id : queueIDs) {
-            buffer.putLong(id);
-         }
-         copyMessage.putBytesProperty(Message.HDR_ROUTE_TO_IDS.toString(), buffer.array());
+      if (queueID != null) {
+         final byte[] encodedBuffer = new byte[Long.BYTES];
+         ByteBuffer buffer = ByteBuffer.wrap(encodedBuffer);
+         buffer.putLong(0, queueID);
+         copyMessage.putBytesProperty(Message.HDR_ROUTE_TO_IDS.toString(), encodedBuffer);
       }
 
       postOffice.route(copyMessage, tx, false, rejectDuplicate);
@@ -3555,7 +3550,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       }
 
       if (copyOriginalHeaders) {
-         copy.referenceOriginalMessage(message, ref.getQueue().getName().toString());
+         copy.referenceOriginalMessage(message, ref.getQueue().getName());
       }
 
       copy.setExpiration(0);
@@ -3584,7 +3579,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
             ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoBindings(expiryAddress);
             acknowledge(tx, ref, AckReason.EXPIRED, null);
          } else {
-            move(expiryAddress, tx, ref, true, false);
+            move(expiryAddress, tx, ref, true, false, null);
          }
       } else {
          if (!printErrorExpiring) {