You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ni...@apache.org on 2021/12/01 09:28:56 UTC

[activemq-artemis] branch main updated (12a93e3 -> 4a91bcf)

This is an automated email from the ASF dual-hosted git repository.

nigrofranz pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git.


    from 12a93e3  Revert "ARTEMIS-3576 Fix toString methods throwing exceptions"
     new ad4f6a1  ARTEMIS-3021 OOM due to wrong CORE clustered message memory estimation
     new 185236f  ARTEMIS-3577 Save Core msg re-encoding due to msg copy
     new 7e6373d  ARTEMIS-3578 Save SimpleString duplication and long[] allocation while moving Core messages
     new 4a91bcf  This closes #3370

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/activemq/artemis/utils/ByteUtil.java    | 10 +++
 .../artemis/utils/collections/TypedProperties.java | 14 ++++-
 .../activemq/artemis/utils/ByteUtilTest.java       | 28 ++++++++-
 .../apache/activemq/artemis/api/core/Message.java  |  9 +--
 .../artemis/core/message/impl/CoreMessage.java     | 71 +++++++++++++++-------
 .../activemq/artemis/message/CoreMessageTest.java  | 31 ++++++++++
 .../protocol/amqp/broker/AMQPLargeMessage.java     |  3 +-
 .../protocol/amqp/converter/TestConversions.java   |  4 +-
 .../core/persistence/impl/journal/LargeBody.java   |  2 +
 .../impl/journal/LargeServerMessageImpl.java       | 19 ++++--
 .../artemis/core/server/impl/DivertImpl.java       |  2 +-
 .../artemis/core/server/impl/QueueImpl.java        | 52 ++++++++--------
 12 files changed, 180 insertions(+), 65 deletions(-)

[activemq-artemis] 03/04: ARTEMIS-3578 Save SimpleString duplication and long[] allocation while moving Core messages

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nigrofranz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 7e6373d4dfa87b1fce690a1387c8eb9edce3af3b
Author: franz1981 <ni...@gmail.com>
AuthorDate: Tue Nov 16 10:11:13 2021 +0100

    ARTEMIS-3578 Save SimpleString duplication and long[] allocation while moving Core messages
---
 .../apache/activemq/artemis/api/core/Message.java  |  9 +++++----
 .../protocol/amqp/broker/AMQPLargeMessage.java     |  3 ++-
 .../protocol/amqp/converter/TestConversions.java   |  4 ++--
 .../impl/journal/LargeServerMessageImpl.java       |  3 ++-
 .../artemis/core/server/impl/DivertImpl.java       |  2 +-
 .../artemis/core/server/impl/QueueImpl.java        | 23 +++++++++-------------
 6 files changed, 21 insertions(+), 23 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 5dd3044..88f303f 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -470,12 +470,13 @@ public interface Message {
       // only valid probably on AMQP
    }
 
-   default void referenceOriginalMessage(final Message original, String originalQueue) {
+   default void referenceOriginalMessage(final Message original, final SimpleString originalQueue) {
       setBrokerProperty(Message.HDR_ORIGINAL_QUEUE, originalQueue);
-      setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, original.getAddress());
+      setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, original.getAddressSimpleString());
       setBrokerProperty(Message.HDR_ORIG_MESSAGE_ID, original.getMessageID());
-      if (original.getRoutingType() != null) {
-         setBrokerProperty(Message.HDR_ORIG_ROUTING_TYPE, original.getRoutingType().getType());
+      final RoutingType routingType = original.getRoutingType();
+      if (routingType != null) {
+         setBrokerProperty(Message.HDR_ORIG_ROUTING_TYPE, routingType.getType());
       }
 
       // reset expiry
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
index 1026eeb..b921574 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
@@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.message.LargeBodyReader;
@@ -643,7 +644,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
    }
 
    @Override
-   public void referenceOriginalMessage(final Message original, String originalQueue) {
+   public void referenceOriginalMessage(final Message original, final SimpleString originalQueue) {
 
       super.referenceOriginalMessage(original, originalQueue);
 
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
index a2fd675..90c2281 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
@@ -499,9 +499,9 @@ public class TestConversions extends Assert {
       for (int i = 0; i < 100; i++) {
          encodedMessage.setMessageID(333L);
          if (i % 3 == 0) {
-            encodedMessage.referenceOriginalMessage(encodedMessage, "SOME-OTHER-QUEUE-DOES-NOT-MATTER-WHAT");
+            encodedMessage.referenceOriginalMessage(encodedMessage, SimpleString.toSimpleString("SOME-OTHER-QUEUE-DOES-NOT-MATTER-WHAT"));
          } else {
-            encodedMessage.referenceOriginalMessage(encodedMessage, "XXX");
+            encodedMessage.referenceOriginalMessage(encodedMessage, SimpleString.toSimpleString("XXX"));
          }
          encodedMessage.putStringProperty("another " + i, "value " + i);
          encodedMessage.messageChanged();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index 6b9e9f1..5b228f2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.message.LargeBodyReader;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
@@ -265,7 +266,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
    }
 
    @Override
-   public void referenceOriginalMessage(final Message original, String originalQueue) {
+   public void referenceOriginalMessage(final Message original, final SimpleString originalQueue) {
 
       super.referenceOriginalMessage(original, originalQueue);
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
index 6831f17..907ea99 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
@@ -105,7 +105,7 @@ public class DivertImpl implements Divert {
          copy = message.copy(id);
 
          // This will set the original MessageId, and the original address
-         copy.referenceOriginalMessage(message, this.getUniqueName().toString());
+         copy.referenceOriginalMessage(message, this.getUniqueName());
 
          copy.setAddress(forwardAddress);
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 3c92b1c..d3da7e9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2709,11 +2709,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                   }
                }
 
-               if (targetQueue != null) {
-                  move(SimpleString.toSimpleString(originalMessageAddress), tx, ref, false, false, targetQueue.longValue());
-               } else {
-                  move(SimpleString.toSimpleString(originalMessageAddress), tx, ref, false, false);
-               }
+               move(SimpleString.toSimpleString(originalMessageAddress), tx, ref, false, false, targetQueue);
 
                return true;
             }
@@ -3386,7 +3382,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                      final MessageReference ref,
                      final boolean expiry,
                      final boolean rejectDuplicate,
-                     final long... queueIDs) throws Exception {
+                     final Long queueID) throws Exception {
       Message copyMessage = makeCopy(ref, expiry, toAddress);
 
       Object originalRoutingType = ref.getMessage().getBrokerProperty(Message.HDR_ORIG_ROUTING_TYPE);
@@ -3394,12 +3390,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          copyMessage.setRoutingType(RoutingType.getType((Byte) originalRoutingType));
       }
 
-      if (queueIDs != null && queueIDs.length > 0) {
-         ByteBuffer buffer = ByteBuffer.allocate(8 * queueIDs.length);
-         for (long id : queueIDs) {
-            buffer.putLong(id);
-         }
-         copyMessage.putBytesProperty(Message.HDR_ROUTE_TO_IDS.toString(), buffer.array());
+      if (queueID != null) {
+         final byte[] encodedBuffer = new byte[Long.BYTES];
+         ByteBuffer buffer = ByteBuffer.wrap(encodedBuffer);
+         buffer.putLong(0, queueID);
+         copyMessage.putBytesProperty(Message.HDR_ROUTE_TO_IDS.toString(), encodedBuffer);
       }
 
       postOffice.route(copyMessage, tx, false, rejectDuplicate);
@@ -3555,7 +3550,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       }
 
       if (copyOriginalHeaders) {
-         copy.referenceOriginalMessage(message, ref.getQueue().getName().toString());
+         copy.referenceOriginalMessage(message, ref.getQueue().getName());
       }
 
       copy.setExpiration(0);
@@ -3584,7 +3579,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
             ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoBindings(expiryAddress);
             acknowledge(tx, ref, AckReason.EXPIRED, null);
          } else {
-            move(expiryAddress, tx, ref, true, false);
+            move(expiryAddress, tx, ref, true, false, null);
          }
       } else {
          if (!printErrorExpiring) {

[activemq-artemis] 02/04: ARTEMIS-3577 Save Core msg re-encoding due to msg copy

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nigrofranz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 185236f74d10300028462ca62b70d7d489665d91
Author: franz1981 <ni...@gmail.com>
AuthorDate: Tue Nov 16 10:58:56 2021 +0100

    ARTEMIS-3577 Save Core msg re-encoding due to msg copy
---
 .../artemis/core/server/impl/QueueImpl.java        | 29 +++++++++++++---------
 1 file changed, 17 insertions(+), 12 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 13f0140..3c92b1c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2649,7 +2649,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction() {
          @Override
          public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
-            return moveBetweenSnFQueues(queueSuffix, tx, ref);
+            return moveBetweenSnFQueues(queueSuffix, tx, ref, null);
          }
       });
    }
@@ -3387,9 +3387,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                      final boolean expiry,
                      final boolean rejectDuplicate,
                      final long... queueIDs) throws Exception {
-      Message copyMessage = makeCopy(ref, expiry);
-
-      copyMessage.setAddress(toAddress);
+      Message copyMessage = makeCopy(ref, expiry, toAddress);
 
       Object originalRoutingType = ref.getMessage().getBrokerProperty(Message.HDR_ORIG_ROUTING_TYPE);
       if (originalRoutingType != null && originalRoutingType instanceof Byte) {
@@ -3417,8 +3415,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    @SuppressWarnings({"ArrayToString", "ArrayToStringConcatenation"})
    private boolean moveBetweenSnFQueues(final SimpleString queueSuffix,
                                      final Transaction tx,
-                                     final MessageReference ref) throws Exception {
-      Message copyMessage = makeCopy(ref, false, false);
+                                     final MessageReference ref,
+                                     final SimpleString newAddress) throws Exception {
+      Message copyMessage = makeCopy(ref, false, false, newAddress);
 
       byte[] oldRouteToIDs = null;
       String targetNodeID;
@@ -3521,13 +3520,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       return new Pair<>(targetNodeID, targetBinding);
    }
 
-   private Message makeCopy(final MessageReference ref, final boolean expiry) throws Exception {
-      return makeCopy(ref, expiry, true);
+   private Message makeCopy(final MessageReference ref, final boolean expiry, final SimpleString newAddress) throws Exception {
+      return makeCopy(ref, expiry, true, newAddress);
    }
 
    private Message makeCopy(final MessageReference ref,
                             final boolean expiry,
-                            final boolean copyOriginalHeaders) throws Exception {
+                            final boolean copyOriginalHeaders,
+                            final SimpleString newAddress) throws Exception {
       if (ref == null) {
          ActiveMQServerLogger.LOGGER.nullRefMessage();
          throw new ActiveMQNullRefException("Reference to message is null");
@@ -3547,6 +3547,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
       Message copy = message.copy(newID, true);
 
+      if (newAddress != null) {
+         // setting it before checkLargeMessage:
+         // checkLargeMessage can cause msg encoding and setting it later invalidate it,
+         // forcing to be re-encoded later
+         copy.setAddress(newAddress);
+      }
+
       if (copyOriginalHeaders) {
          copy.referenceOriginalMessage(message, ref.getQueue().getName().toString());
       }
@@ -3706,9 +3713,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          tx = new TransactionImpl(storageManager);
       }
 
-      Message copyMessage = makeCopy(ref, reason == AckReason.EXPIRED);
-
-      copyMessage.setAddress(address);
+      Message copyMessage = makeCopy(ref, reason == AckReason.EXPIRED, address);
 
       RoutingStatus routingStatus = postOffice.route(copyMessage, tx, false, rejectDuplicate, binding);
 

[activemq-artemis] 01/04: ARTEMIS-3021 OOM due to wrong CORE clustered message memory estimation

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nigrofranz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit ad4f6a133af9130f206a78e43ad387d4e3ee5f8f
Author: franz1981 <ni...@gmail.com>
AuthorDate: Sun Dec 6 00:07:29 2020 +0100

    ARTEMIS-3021 OOM due to wrong CORE clustered message memory estimation
---
 .../apache/activemq/artemis/utils/ByteUtil.java    | 10 +++
 .../artemis/utils/collections/TypedProperties.java | 14 ++++-
 .../activemq/artemis/utils/ByteUtilTest.java       | 28 ++++++++-
 .../artemis/core/message/impl/CoreMessage.java     | 71 +++++++++++++++-------
 .../activemq/artemis/message/CoreMessageTest.java  | 31 ++++++++++
 .../core/persistence/impl/journal/LargeBody.java   |  2 +
 .../impl/journal/LargeServerMessageImpl.java       | 16 +++--
 7 files changed, 142 insertions(+), 30 deletions(-)

diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
index 11e895f..8c5fec9 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
@@ -389,6 +389,16 @@ public class ByteUtil {
       return true;
    }
 
+   /**
+    * This ensure a more exact resizing then {@link ByteBuf#ensureWritable(int)}, if needed.<br>
+    * It won't try to trim a large enough buffer.
+    */
+   public static void ensureExactWritable(ByteBuf buffer, int minWritableBytes) {
+      if (buffer.maxFastWritableBytes() < minWritableBytes) {
+         buffer.capacity(buffer.writerIndex() + minWritableBytes);
+      }
+   }
+
 
    /**
     * Returns {@code true} if  the {@link SimpleString} encoded content into {@code bytes} is equals to {@code s},
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
index cde5e1e..ac7a090 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
@@ -38,6 +38,7 @@ import org.apache.activemq.artemis.utils.AbstractByteBufPool;
 import org.apache.activemq.artemis.utils.ByteUtil;
 import org.apache.activemq.artemis.utils.DataConstants;
 
+import static org.apache.activemq.artemis.utils.ByteUtil.ensureExactWritable;
 import static org.apache.activemq.artemis.utils.DataConstants.BOOLEAN;
 import static org.apache.activemq.artemis.utils.DataConstants.BYTE;
 import static org.apache.activemq.artemis.utils.DataConstants.BYTES;
@@ -546,11 +547,18 @@ public class TypedProperties {
       decode(buffer, null);
    }
 
-
-   public synchronized void encode(final ByteBuf buffer) {
+   public synchronized int encode(final ByteBuf buffer) {
+      final int encodedSize;
+      // it's a trick to not pay the cost of buffer.writeIndex without assertions enabled
+      int writerIndex = 0;
+      assert (writerIndex = buffer.writerIndex()) >= 0 : "Always true";
       if (properties == null || size == 0) {
+         encodedSize = DataConstants.SIZE_BYTE;
+         ensureExactWritable(buffer, encodedSize);
          buffer.writeByte(DataConstants.NULL);
       } else {
+         encodedSize = DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + size;
+         ensureExactWritable(buffer, encodedSize);
          buffer.writeByte(DataConstants.NOT_NULL);
 
          buffer.writeInt(properties.size());
@@ -563,6 +571,8 @@ public class TypedProperties {
             value.write(buffer);
          });
       }
+      assert buffer.writerIndex() == (writerIndex + encodedSize) : "Bad encode size estimation";
+      return encodedSize;
    }
 
    public synchronized int getEncodeSize() {
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java
index 83fab14..b4eea72 100644
--- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java
@@ -24,6 +24,8 @@ import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import io.netty.util.internal.PlatformDependent;
 import org.jboss.logging.Logger;
 import org.junit.Assert;
@@ -37,7 +39,7 @@ import static org.junit.Assert.fail;
 
 public class ByteUtilTest {
 
-   private static Logger log = Logger.getLogger(ByteUtilTest.class);
+   private static final Logger log = Logger.getLogger(ByteUtilTest.class);
 
    @Test
    public void testBytesToString() {
@@ -409,5 +411,29 @@ public class ByteUtilTest {
       assertArrayEquals(assertContent, convertedContent);
    }
 
+   @Test(expected = IllegalArgumentException.class)
+   public void shouldEnsureExactWritableFailToEnlargeWrappedByteBuf() {
+      byte[] wrapped = new byte[32];
+      ByteBuf buffer = Unpooled.wrappedBuffer(wrapped);
+      buffer.writerIndex(wrapped.length);
+      ByteUtil.ensureExactWritable(buffer, 1);
+   }
+
+   @Test
+   public void shouldEnsureExactWritableNotEnlargeBufferWithEnoughSpace() {
+      byte[] wrapped = new byte[32];
+      ByteBuf buffer = Unpooled.wrappedBuffer(wrapped);
+      buffer.writerIndex(wrapped.length - 1);
+      ByteUtil.ensureExactWritable(buffer, 1);
+      Assert.assertSame(wrapped, buffer.array());
+   }
+
+   @Test
+   public void shouldEnsureExactWritableEnlargeBufferWithoutEnoughSpace() {
+      ByteBuf buffer = Unpooled.buffer(32);
+      buffer.writerIndex(32);
+      ByteUtil.ensureExactWritable(buffer, 1);
+      Assert.assertEquals(33, buffer.capacity());
+   }
 
 }
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 3b21011..6c55c3f 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -60,13 +60,15 @@ import org.apache.activemq.artemis.utils.UUID;
 import org.apache.activemq.artemis.utils.collections.TypedProperties;
 import org.jboss.logging.Logger;
 
+import static org.apache.activemq.artemis.utils.ByteUtil.ensureExactWritable;
+
 /** Note: you shouldn't change properties using multi-threads. Change your properties before you can send it to multiple
  *  consumers */
 public class CoreMessage extends RefCountMessage implements ICoreMessage {
 
    public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE;
 
-   private volatile int memoryEstimate = -1;
+   protected volatile int memoryEstimate = -1;
 
    private static final Logger logger = Logger.getLogger(CoreMessage.class);
 
@@ -81,8 +83,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
 
    protected volatile ResetLimitWrappedActiveMQBuffer writableBuffer;
 
-   Object body;
-
    protected int endOfBodyPosition = -1;
 
    protected int messageIDPosition = -1;
@@ -445,7 +445,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
       // with getEncodedBuffer(), otherwise can introduce race condition when delivering concurrently to
       // many subscriptions and bridging to other nodes in a cluster
       synchronized (other) {
-         this.body = other.body;
          this.endOfBodyPosition = other.endOfBodyPosition;
          internalSetMessageID(other.messageID);
          this.address = other.address;
@@ -641,6 +640,15 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    @Override
    public int getMemoryEstimate() {
       if (memoryEstimate == -1) {
+         if (buffer != null && !isLargeMessage()) {
+            if (!validBuffer) {
+               // this can happen if a message is modified
+               // eg clustered messages get additional routing information
+               // that need to be correctly accounted in memory
+               checkEncode();
+            }
+         }
+         final TypedProperties properties = this.properties;
          memoryEstimate = memoryOffset +
             (buffer != null ? buffer.capacity() : 0) +
             (properties != null ? properties.getMemoryOffset() : 0);
@@ -733,11 +741,9 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
          endOfBodyPosition = BUFFER_HEADER_SPACE + DataConstants.SIZE_INT;
       }
 
-      buffer.setIndex(0, 0);
-      buffer.writeInt(endOfBodyPosition);
-
+      buffer.setInt(0, endOfBodyPosition);
       // The end of body position
-      buffer.writerIndex(endOfBodyPosition - BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
+      buffer.setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
 
       encodeHeadersAndProperties(buffer);
 
@@ -748,21 +754,42 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
 
    public void encodeHeadersAndProperties(final ByteBuf buffer) {
       final TypedProperties properties = getProperties();
-      messageIDPosition = buffer.writerIndex();
-      buffer.writeLong(messageID);
-      SimpleString.writeNullableSimpleString(buffer, address);
-      if (userID == null) {
-         buffer.writeByte(DataConstants.NULL);
-      } else {
-         buffer.writeByte(DataConstants.NOT_NULL);
-         buffer.writeBytes(userID.asBytes());
+      final int initialWriterIndex = buffer.writerIndex();
+      messageIDPosition = initialWriterIndex;
+      final UUID userID = this.userID;
+      final int userIDEncodedSize = userID == null ? Byte.BYTES : Byte.BYTES + userID.asBytes().length;
+      final SimpleString address = this.address;
+      final int addressEncodedBytes = SimpleString.sizeofNullableString(address);
+      final int headersSize =
+         Long.BYTES +                                    // messageID
+         addressEncodedBytes +                           // address
+         userIDEncodedSize +                             // userID
+         Byte.BYTES +                                    // type
+         Byte.BYTES +                                    // durable
+         Long.BYTES +                                    // expiration
+         Long.BYTES +                                    // timestamp
+         Byte.BYTES;                                     // priority
+      synchronized (properties) {
+         final int propertiesEncodeSize = properties.getEncodeSize();
+         final int totalEncodedSize = headersSize + propertiesEncodeSize;
+         ensureExactWritable(buffer, totalEncodedSize);
+         buffer.writeLong(messageID);
+         SimpleString.writeNullableSimpleString(buffer, address);
+         if (userID == null) {
+            buffer.writeByte(DataConstants.NULL);
+         } else {
+            buffer.writeByte(DataConstants.NOT_NULL);
+            buffer.writeBytes(userID.asBytes());
+         }
+         buffer.writeByte(type);
+         buffer.writeBoolean(durable);
+         buffer.writeLong(expiration);
+         buffer.writeLong(timestamp);
+         buffer.writeByte(priority);
+         assert buffer.writerIndex() == initialWriterIndex + headersSize : "Bad Headers encode size estimation";
+         final int realPropertiesEncodeSize = properties.encode(buffer);
+         assert realPropertiesEncodeSize == propertiesEncodeSize : "TypedProperties has a wrong encode size estimation or is being modified concurrently";
       }
-      buffer.writeByte(type);
-      buffer.writeBoolean(durable);
-      buffer.writeLong(expiration);
-      buffer.writeLong(timestamp);
-      buffer.writeByte(priority);
-      properties.encode(buffer);
    }
 
    @Override
diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java
index c52daa4..ef0ef04 100644
--- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java
+++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java
@@ -38,6 +38,9 @@ import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+
 public class CoreMessageTest {
 
    public static final SimpleString ADDRESS = new SimpleString("this.local.address");
@@ -359,6 +362,34 @@ public class CoreMessageTest {
 
    }
 
+   @Test
+   public void testMemoryEstimateChangedAfterModifiedAlreadyEncodedCopy() {
+      final CoreMessage msg = new CoreMessage(1, 44);
+      msg.getEncodeSize();
+      final int memoryEstimate = msg.getMemoryEstimate();
+      final CoreMessage copy = (CoreMessage) msg.copy(2);
+      copy.getEncodeSize();
+      copy.putBytesProperty(Message.HDR_ROUTE_TO_IDS, new byte[Long.BYTES]);
+      final int increasedMemoryFootprint = copy.getMemoryEstimate() - memoryEstimate;
+      final int increasedPropertyFootprint = copy.getProperties().getMemoryOffset() - msg.getProperties().getMemoryOffset();
+      assertThat("memory estimation isn't accounting for the additional encoded property",
+                 increasedMemoryFootprint, greaterThan(increasedPropertyFootprint));
+   }
+
+   @Test
+   public void testMessageBufferCapacityMatchEncodedSizeAfterModifiedCopy() {
+      final CoreMessage msg = new CoreMessage(1, 4155);
+      msg.setAddress("a");
+      msg.putBytesProperty("_", new byte[4096]);
+      final CoreMessage copy = (CoreMessage) msg.copy(2);
+      Assert.assertEquals(msg.getEncodeSize(), copy.getBuffer().capacity());
+      copy.setAddress("b");
+      copy.setBrokerProperty(Message.HDR_ORIGINAL_QUEUE, "a");
+      copy.setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, msg.getAddressSimpleString());
+      copy.setBrokerProperty(Message.HDR_ORIG_MESSAGE_ID, msg.getMessageID());
+      Assert.assertEquals(copy.getEncodeSize(), copy.getBuffer().capacity());
+   }
+
    private void printVariable(String body, String encode) {
       System.out.println("// body = \"" + body + "\";");
       System.out.println("private final String STRING_ENCODE = \"" + encode + "\";");
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java
index 1eacecd..e394660 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java
@@ -37,6 +37,8 @@ import org.jboss.logging.Logger;
 
 public class LargeBody {
 
+   static final int MEMORY_OFFSET = 56;
+
    private static final Logger logger = Logger.getLogger(LargeBody.class);
 
    private long bodySize = -1;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index 267e1ef..6b9e9f1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -33,6 +33,11 @@ import org.jboss.logging.Logger;
 
 public final class LargeServerMessageImpl extends CoreMessage implements CoreLargeServerMessage {
 
+   // Given that LargeBody is never null it needs to be accounted on this instance footprint.
+   // This value has been computed using https://github.com/openjdk/jol
+   // with HotSpot 64-bit COOPS 8-byte align
+   private static final int MEMORY_OFFSET = 112 + LargeBody.MEMORY_OFFSET;
+
    @Override
    public Message toMessage() {
       return this;
@@ -73,9 +78,6 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
 
    private final StorageManager storageManager;
 
-   // We cache this
-   private volatile int memoryEstimate = -1;
-
    public LargeServerMessageImpl(final StorageManager storageManager) {
       largeBody = new LargeBody(this, storageManager);
       this.storageManager = storageManager;
@@ -243,8 +245,12 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
    public int getMemoryEstimate() {
       synchronized (largeBody) {
          if (memoryEstimate == -1) {
-            // The body won't be on memory (aways on-file), so we don't consider this for paging
-            memoryEstimate = getHeadersAndPropertiesEncodeSize() + DataConstants.SIZE_INT + getEncodeSize() + (16 + 4) * 2 + 1;
+            // The body won't be on memory (always on-file), so we don't consider this for paging
+            memoryEstimate = MEMORY_OFFSET +
+               getHeadersAndPropertiesEncodeSize() +
+               DataConstants.SIZE_INT +
+               getEncodeSize() +
+               (16 + 4) * 2 + 1;
          }
 
          return memoryEstimate;

[activemq-artemis] 04/04: This closes #3370

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nigrofranz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 4a91bcfd58f9b49a0bf316e8b28b0525d54098c5
Merge: 12a93e3 7e6373d
Author: franz1981 <ni...@gmail.com>
AuthorDate: Wed Dec 1 10:27:47 2021 +0100

    This closes #3370

 .../apache/activemq/artemis/utils/ByteUtil.java    | 10 +++
 .../artemis/utils/collections/TypedProperties.java | 14 ++++-
 .../activemq/artemis/utils/ByteUtilTest.java       | 28 ++++++++-
 .../apache/activemq/artemis/api/core/Message.java  |  9 +--
 .../artemis/core/message/impl/CoreMessage.java     | 71 +++++++++++++++-------
 .../activemq/artemis/message/CoreMessageTest.java  | 31 ++++++++++
 .../protocol/amqp/broker/AMQPLargeMessage.java     |  3 +-
 .../protocol/amqp/converter/TestConversions.java   |  4 +-
 .../core/persistence/impl/journal/LargeBody.java   |  2 +
 .../impl/journal/LargeServerMessageImpl.java       | 19 ++++--
 .../artemis/core/server/impl/DivertImpl.java       |  2 +-
 .../artemis/core/server/impl/QueueImpl.java        | 52 ++++++++--------
 12 files changed, 180 insertions(+), 65 deletions(-)