You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ni...@apache.org on 2021/12/01 09:28:57 UTC

[activemq-artemis] 01/04: ARTEMIS-3021 OOM due to wrong CORE clustered message memory estimation

This is an automated email from the ASF dual-hosted git repository.

nigrofranz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit ad4f6a133af9130f206a78e43ad387d4e3ee5f8f
Author: franz1981 <ni...@gmail.com>
AuthorDate: Sun Dec 6 00:07:29 2020 +0100

    ARTEMIS-3021 OOM due to wrong CORE clustered message memory estimation
---
 .../apache/activemq/artemis/utils/ByteUtil.java    | 10 +++
 .../artemis/utils/collections/TypedProperties.java | 14 ++++-
 .../activemq/artemis/utils/ByteUtilTest.java       | 28 ++++++++-
 .../artemis/core/message/impl/CoreMessage.java     | 71 +++++++++++++++-------
 .../activemq/artemis/message/CoreMessageTest.java  | 31 ++++++++++
 .../core/persistence/impl/journal/LargeBody.java   |  2 +
 .../impl/journal/LargeServerMessageImpl.java       | 16 +++--
 7 files changed, 142 insertions(+), 30 deletions(-)

diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
index 11e895f..8c5fec9 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
@@ -389,6 +389,16 @@ public class ByteUtil {
       return true;
    }
 
+   /**
+    * This ensure a more exact resizing then {@link ByteBuf#ensureWritable(int)}, if needed.<br>
+    * It won't try to trim a large enough buffer.
+    */
+   public static void ensureExactWritable(ByteBuf buffer, int minWritableBytes) {
+      if (buffer.maxFastWritableBytes() < minWritableBytes) {
+         buffer.capacity(buffer.writerIndex() + minWritableBytes);
+      }
+   }
+
 
    /**
     * Returns {@code true} if  the {@link SimpleString} encoded content into {@code bytes} is equals to {@code s},
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
index cde5e1e..ac7a090 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
@@ -38,6 +38,7 @@ import org.apache.activemq.artemis.utils.AbstractByteBufPool;
 import org.apache.activemq.artemis.utils.ByteUtil;
 import org.apache.activemq.artemis.utils.DataConstants;
 
+import static org.apache.activemq.artemis.utils.ByteUtil.ensureExactWritable;
 import static org.apache.activemq.artemis.utils.DataConstants.BOOLEAN;
 import static org.apache.activemq.artemis.utils.DataConstants.BYTE;
 import static org.apache.activemq.artemis.utils.DataConstants.BYTES;
@@ -546,11 +547,18 @@ public class TypedProperties {
       decode(buffer, null);
    }
 
-
-   public synchronized void encode(final ByteBuf buffer) {
+   public synchronized int encode(final ByteBuf buffer) {
+      final int encodedSize;
+      // it's a trick to not pay the cost of buffer.writeIndex without assertions enabled
+      int writerIndex = 0;
+      assert (writerIndex = buffer.writerIndex()) >= 0 : "Always true";
       if (properties == null || size == 0) {
+         encodedSize = DataConstants.SIZE_BYTE;
+         ensureExactWritable(buffer, encodedSize);
          buffer.writeByte(DataConstants.NULL);
       } else {
+         encodedSize = DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + size;
+         ensureExactWritable(buffer, encodedSize);
          buffer.writeByte(DataConstants.NOT_NULL);
 
          buffer.writeInt(properties.size());
@@ -563,6 +571,8 @@ public class TypedProperties {
             value.write(buffer);
          });
       }
+      assert buffer.writerIndex() == (writerIndex + encodedSize) : "Bad encode size estimation";
+      return encodedSize;
    }
 
    public synchronized int getEncodeSize() {
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java
index 83fab14..b4eea72 100644
--- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java
@@ -24,6 +24,8 @@ import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import io.netty.util.internal.PlatformDependent;
 import org.jboss.logging.Logger;
 import org.junit.Assert;
@@ -37,7 +39,7 @@ import static org.junit.Assert.fail;
 
 public class ByteUtilTest {
 
-   private static Logger log = Logger.getLogger(ByteUtilTest.class);
+   private static final Logger log = Logger.getLogger(ByteUtilTest.class);
 
    @Test
    public void testBytesToString() {
@@ -409,5 +411,29 @@ public class ByteUtilTest {
       assertArrayEquals(assertContent, convertedContent);
    }
 
+   @Test(expected = IllegalArgumentException.class)
+   public void shouldEnsureExactWritableFailToEnlargeWrappedByteBuf() {
+      byte[] wrapped = new byte[32];
+      ByteBuf buffer = Unpooled.wrappedBuffer(wrapped);
+      buffer.writerIndex(wrapped.length);
+      ByteUtil.ensureExactWritable(buffer, 1);
+   }
+
+   @Test
+   public void shouldEnsureExactWritableNotEnlargeBufferWithEnoughSpace() {
+      byte[] wrapped = new byte[32];
+      ByteBuf buffer = Unpooled.wrappedBuffer(wrapped);
+      buffer.writerIndex(wrapped.length - 1);
+      ByteUtil.ensureExactWritable(buffer, 1);
+      Assert.assertSame(wrapped, buffer.array());
+   }
+
+   @Test
+   public void shouldEnsureExactWritableEnlargeBufferWithoutEnoughSpace() {
+      ByteBuf buffer = Unpooled.buffer(32);
+      buffer.writerIndex(32);
+      ByteUtil.ensureExactWritable(buffer, 1);
+      Assert.assertEquals(33, buffer.capacity());
+   }
 
 }
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 3b21011..6c55c3f 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -60,13 +60,15 @@ import org.apache.activemq.artemis.utils.UUID;
 import org.apache.activemq.artemis.utils.collections.TypedProperties;
 import org.jboss.logging.Logger;
 
+import static org.apache.activemq.artemis.utils.ByteUtil.ensureExactWritable;
+
 /** Note: you shouldn't change properties using multi-threads. Change your properties before you can send it to multiple
  *  consumers */
 public class CoreMessage extends RefCountMessage implements ICoreMessage {
 
    public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE;
 
-   private volatile int memoryEstimate = -1;
+   protected volatile int memoryEstimate = -1;
 
    private static final Logger logger = Logger.getLogger(CoreMessage.class);
 
@@ -81,8 +83,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
 
    protected volatile ResetLimitWrappedActiveMQBuffer writableBuffer;
 
-   Object body;
-
    protected int endOfBodyPosition = -1;
 
    protected int messageIDPosition = -1;
@@ -445,7 +445,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
       // with getEncodedBuffer(), otherwise can introduce race condition when delivering concurrently to
       // many subscriptions and bridging to other nodes in a cluster
       synchronized (other) {
-         this.body = other.body;
          this.endOfBodyPosition = other.endOfBodyPosition;
          internalSetMessageID(other.messageID);
          this.address = other.address;
@@ -641,6 +640,15 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    @Override
    public int getMemoryEstimate() {
       if (memoryEstimate == -1) {
+         if (buffer != null && !isLargeMessage()) {
+            if (!validBuffer) {
+               // this can happen if a message is modified
+               // eg clustered messages get additional routing information
+               // that need to be correctly accounted in memory
+               checkEncode();
+            }
+         }
+         final TypedProperties properties = this.properties;
          memoryEstimate = memoryOffset +
             (buffer != null ? buffer.capacity() : 0) +
             (properties != null ? properties.getMemoryOffset() : 0);
@@ -733,11 +741,9 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
          endOfBodyPosition = BUFFER_HEADER_SPACE + DataConstants.SIZE_INT;
       }
 
-      buffer.setIndex(0, 0);
-      buffer.writeInt(endOfBodyPosition);
-
+      buffer.setInt(0, endOfBodyPosition);
       // The end of body position
-      buffer.writerIndex(endOfBodyPosition - BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
+      buffer.setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
 
       encodeHeadersAndProperties(buffer);
 
@@ -748,21 +754,42 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
 
    public void encodeHeadersAndProperties(final ByteBuf buffer) {
       final TypedProperties properties = getProperties();
-      messageIDPosition = buffer.writerIndex();
-      buffer.writeLong(messageID);
-      SimpleString.writeNullableSimpleString(buffer, address);
-      if (userID == null) {
-         buffer.writeByte(DataConstants.NULL);
-      } else {
-         buffer.writeByte(DataConstants.NOT_NULL);
-         buffer.writeBytes(userID.asBytes());
+      final int initialWriterIndex = buffer.writerIndex();
+      messageIDPosition = initialWriterIndex;
+      final UUID userID = this.userID;
+      final int userIDEncodedSize = userID == null ? Byte.BYTES : Byte.BYTES + userID.asBytes().length;
+      final SimpleString address = this.address;
+      final int addressEncodedBytes = SimpleString.sizeofNullableString(address);
+      final int headersSize =
+         Long.BYTES +                                    // messageID
+         addressEncodedBytes +                           // address
+         userIDEncodedSize +                             // userID
+         Byte.BYTES +                                    // type
+         Byte.BYTES +                                    // durable
+         Long.BYTES +                                    // expiration
+         Long.BYTES +                                    // timestamp
+         Byte.BYTES;                                     // priority
+      synchronized (properties) {
+         final int propertiesEncodeSize = properties.getEncodeSize();
+         final int totalEncodedSize = headersSize + propertiesEncodeSize;
+         ensureExactWritable(buffer, totalEncodedSize);
+         buffer.writeLong(messageID);
+         SimpleString.writeNullableSimpleString(buffer, address);
+         if (userID == null) {
+            buffer.writeByte(DataConstants.NULL);
+         } else {
+            buffer.writeByte(DataConstants.NOT_NULL);
+            buffer.writeBytes(userID.asBytes());
+         }
+         buffer.writeByte(type);
+         buffer.writeBoolean(durable);
+         buffer.writeLong(expiration);
+         buffer.writeLong(timestamp);
+         buffer.writeByte(priority);
+         assert buffer.writerIndex() == initialWriterIndex + headersSize : "Bad Headers encode size estimation";
+         final int realPropertiesEncodeSize = properties.encode(buffer);
+         assert realPropertiesEncodeSize == propertiesEncodeSize : "TypedProperties has a wrong encode size estimation or is being modified concurrently";
       }
-      buffer.writeByte(type);
-      buffer.writeBoolean(durable);
-      buffer.writeLong(expiration);
-      buffer.writeLong(timestamp);
-      buffer.writeByte(priority);
-      properties.encode(buffer);
    }
 
    @Override
diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java
index c52daa4..ef0ef04 100644
--- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java
+++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java
@@ -38,6 +38,9 @@ import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+
 public class CoreMessageTest {
 
    public static final SimpleString ADDRESS = new SimpleString("this.local.address");
@@ -359,6 +362,34 @@ public class CoreMessageTest {
 
    }
 
+   @Test
+   public void testMemoryEstimateChangedAfterModifiedAlreadyEncodedCopy() {
+      final CoreMessage msg = new CoreMessage(1, 44);
+      msg.getEncodeSize();
+      final int memoryEstimate = msg.getMemoryEstimate();
+      final CoreMessage copy = (CoreMessage) msg.copy(2);
+      copy.getEncodeSize();
+      copy.putBytesProperty(Message.HDR_ROUTE_TO_IDS, new byte[Long.BYTES]);
+      final int increasedMemoryFootprint = copy.getMemoryEstimate() - memoryEstimate;
+      final int increasedPropertyFootprint = copy.getProperties().getMemoryOffset() - msg.getProperties().getMemoryOffset();
+      assertThat("memory estimation isn't accounting for the additional encoded property",
+                 increasedMemoryFootprint, greaterThan(increasedPropertyFootprint));
+   }
+
+   @Test
+   public void testMessageBufferCapacityMatchEncodedSizeAfterModifiedCopy() {
+      final CoreMessage msg = new CoreMessage(1, 4155);
+      msg.setAddress("a");
+      msg.putBytesProperty("_", new byte[4096]);
+      final CoreMessage copy = (CoreMessage) msg.copy(2);
+      Assert.assertEquals(msg.getEncodeSize(), copy.getBuffer().capacity());
+      copy.setAddress("b");
+      copy.setBrokerProperty(Message.HDR_ORIGINAL_QUEUE, "a");
+      copy.setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, msg.getAddressSimpleString());
+      copy.setBrokerProperty(Message.HDR_ORIG_MESSAGE_ID, msg.getMessageID());
+      Assert.assertEquals(copy.getEncodeSize(), copy.getBuffer().capacity());
+   }
+
    private void printVariable(String body, String encode) {
       System.out.println("// body = \"" + body + "\";");
       System.out.println("private final String STRING_ENCODE = \"" + encode + "\";");
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java
index 1eacecd..e394660 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java
@@ -37,6 +37,8 @@ import org.jboss.logging.Logger;
 
 public class LargeBody {
 
+   static final int MEMORY_OFFSET = 56;
+
    private static final Logger logger = Logger.getLogger(LargeBody.class);
 
    private long bodySize = -1;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index 267e1ef..6b9e9f1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -33,6 +33,11 @@ import org.jboss.logging.Logger;
 
 public final class LargeServerMessageImpl extends CoreMessage implements CoreLargeServerMessage {
 
+   // Given that LargeBody is never null it needs to be accounted on this instance footprint.
+   // This value has been computed using https://github.com/openjdk/jol
+   // with HotSpot 64-bit COOPS 8-byte align
+   private static final int MEMORY_OFFSET = 112 + LargeBody.MEMORY_OFFSET;
+
    @Override
    public Message toMessage() {
       return this;
@@ -73,9 +78,6 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
 
    private final StorageManager storageManager;
 
-   // We cache this
-   private volatile int memoryEstimate = -1;
-
    public LargeServerMessageImpl(final StorageManager storageManager) {
       largeBody = new LargeBody(this, storageManager);
       this.storageManager = storageManager;
@@ -243,8 +245,12 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
    public int getMemoryEstimate() {
       synchronized (largeBody) {
          if (memoryEstimate == -1) {
-            // The body won't be on memory (aways on-file), so we don't consider this for paging
-            memoryEstimate = getHeadersAndPropertiesEncodeSize() + DataConstants.SIZE_INT + getEncodeSize() + (16 + 4) * 2 + 1;
+            // The body won't be on memory (always on-file), so we don't consider this for paging
+            memoryEstimate = MEMORY_OFFSET +
+               getHeadersAndPropertiesEncodeSize() +
+               DataConstants.SIZE_INT +
+               getEncodeSize() +
+               (16 + 4) * 2 + 1;
          }
 
          return memoryEstimate;