You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/05/13 13:47:44 UTC

[activemq-artemis] branch master updated: ARTEMIS-2317 Reuse file buffer wrapper instances to reduce allocations

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 79465f7  ARTEMIS-2317 Reuse file buffer wrapper instances to reduce allocations
     new 2b3341d  This closes #2646
79465f7 is described below

commit 79465f7f88e32ca477655bb3f7fc6d261511ebae
Author: Francesco Nigro <ni...@gmail.com>
AuthorDate: Fri Apr 26 12:12:31 2019 +0200

    ARTEMIS-2317 Reuse file buffer wrapper instances to reduce allocations
    
    Page::read is allocating a new ChannelBufferWrapper on each
    paged message read: to reduce the allocation rate, it could be
    reused until a new wrapped ByteBuffer is created
---
 .../activemq/artemis/core/paging/impl/Page.java    | 49 ++++++++++++++++------
 1 file changed, 37 insertions(+), 12 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
index 03d0e67..96ed2ba 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
@@ -22,9 +22,8 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
@@ -119,12 +118,6 @@ public final class Page implements Comparable<Page> {
       return messages;
    }
 
-   private static void decodeInto(ByteBuffer fileBuffer, int encodedSize, PagedMessageImpl msg) {
-      final ActiveMQBuffer wrappedBuffer = ActiveMQBuffers.wrappedBuffer(fileBuffer);
-      wrappedBuffer.writerIndex(encodedSize);
-      msg.decode(wrappedBuffer);
-   }
-
    private ByteBuffer allocateAndReadIntoFileBuffer(ByteBuffer fileBuffer, int requiredBytes) throws Exception {
       final ByteBuffer newFileBuffer = fileFactory.newBuffer(Math.max(requiredBytes, MIN_CHUNK_SIZE));
       newFileBuffer.put(fileBuffer);
@@ -171,6 +164,24 @@ public final class Page implements Comparable<Page> {
       return fileBuffer;
    }
 
+   private static ChannelBufferWrapper wrapWhole(ByteBuffer fileBuffer) {
+      final int position = fileBuffer.position();
+      final int limit = fileBuffer.limit();
+      final int capacity = fileBuffer.capacity();
+      try {
+         fileBuffer.clear();
+         final ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(fileBuffer);
+         //this check is important to avoid next ByteBuf::setIndex
+         //to fail due to ByteBuf::capacity == ByteBuffer::remaining bytes
+         assert wrappedBuffer.readableBytes() == capacity;
+         final ChannelBufferWrapper fileBufferWrapper = new ChannelBufferWrapper(wrappedBuffer);
+         return fileBufferWrapper;
+      } finally {
+         fileBuffer.position(position);
+         fileBuffer.limit(limit);
+      }
+   }
+
    //sizeOf(START_BYTE) + sizeOf(MESSAGE LENGTH) + sizeOf(END_BYTE)
    private static final int HEADER_AND_TRAILER_SIZE = DataConstants.SIZE_INT + 2;
    private static final int MINIMUM_MSG_PERSISTENT_SIZE = HEADER_AND_TRAILER_SIZE;
@@ -182,24 +193,39 @@ public final class Page implements Comparable<Page> {
       file.position(0);
       int processedBytes = 0;
       ByteBuffer fileBuffer = null;
+      ChannelBufferWrapper fileBufferWrapper;
       try {
          int remainingBytes = fileSize - processedBytes;
          if (remainingBytes >= MINIMUM_MSG_PERSISTENT_SIZE) {
             fileBuffer = fileFactory.newBuffer(Math.min(remainingBytes, MIN_CHUNK_SIZE));
+            //the wrapper is reused to avoid unnecessary allocations
+            fileBufferWrapper = wrapWhole(fileBuffer);
+            //no content is being added yet
             fileBuffer.limit(0);
             do {
+               final ByteBuffer oldFileBuffer = fileBuffer;
                fileBuffer = readIntoFileBufferIfNecessary(fileBuffer, MINIMUM_MSG_PERSISTENT_SIZE);
+               //change wrapper if fileBuffer has changed
+               if (fileBuffer != oldFileBuffer) {
+                  fileBufferWrapper = wrapWhole(fileBuffer);
+               }
                final byte startByte = fileBuffer.get();
                if (startByte == Page.START_BYTE) {
                   final int encodedSize = fileBuffer.getInt();
                   final int nextPosition = processedBytes + HEADER_AND_TRAILER_SIZE + encodedSize;
                   if (nextPosition <= fileSize) {
+                     final ByteBuffer currentFileBuffer = fileBuffer;
                      fileBuffer = readIntoFileBufferIfNecessary(fileBuffer, encodedSize + 1);
+                     //change wrapper if fileBuffer has changed
+                     if (fileBuffer != currentFileBuffer) {
+                        fileBufferWrapper = wrapWhole(fileBuffer);
+                     }
                      final int endPosition = fileBuffer.position() + encodedSize;
                      //this check must be performed upfront decoding
                      if (fileBuffer.remaining() >= (encodedSize + 1) && fileBuffer.get(endPosition) == Page.END_BYTE) {
                         final PagedMessageImpl msg = new PagedMessageImpl(storageManager);
-                        decodeInto(fileBuffer, encodedSize, msg);
+                        fileBufferWrapper.setIndex(fileBuffer.position(), endPosition);
+                        msg.decode(fileBufferWrapper);
                         fileBuffer.position(endPosition + 1);
                         assert fileBuffer.get(endPosition) == Page.END_BYTE : "decoding cannot change end byte";
                         msg.initMessage(storage);
@@ -415,8 +441,7 @@ public final class Page implements Comparable<Page> {
     * @param pageSubscriptionCounter
     */
    public void addPendingCounter(PageSubscriptionCounter pageSubscriptionCounter) {
-      Set<PageSubscriptionCounter> counter = getOrCreatePendingCounters();
-      pendingCounters.add(pageSubscriptionCounter);
+      getOrCreatePendingCounters().add(pageSubscriptionCounter);
    }
 
    private synchronized Set<PageSubscriptionCounter> getPendingCounters() {
@@ -430,4 +455,4 @@ public final class Page implements Comparable<Page> {
 
       return pendingCounters;
    }
-}
+}
\ No newline at end of file