You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/06/23 14:43:51 UTC

[1/2] activemq-artemis git commit: This closes #1361

Repository: activemq-artemis
Updated Branches:
  refs/heads/master d6191e31e -> b640a6241


This closes #1361


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b640a624
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b640a624
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b640a624

Branch: refs/heads/master
Commit: b640a624170ae5339892b81ef2c647b8b1596ed4
Parents: d6191e3 325f46e
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Jun 23 09:37:46 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Jun 23 09:37:46 2017 -0400

----------------------------------------------------------------------
 .../activemq/artemis/core/paging/impl/Page.java | 99 +++++++++++---------
 1 file changed, 53 insertions(+), 46 deletions(-)
----------------------------------------------------------------------



[2/2] activemq-artemis git commit: ARTEMIS-1248 Reduce garbage while Paging

Posted by cl...@apache.org.
ARTEMIS-1248 Reduce garbage while Paging


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/325f46e0
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/325f46e0
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/325f46e0

Branch: refs/heads/master
Commit: 325f46e01f4c15fb640197d9c7fef3031252d064
Parents: d6191e3
Author: Francesco Nigro <ni...@gmail.com>
Authored: Thu Jun 22 11:37:16 2017 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Jun 23 09:37:46 2017 -0400

----------------------------------------------------------------------
 .../activemq/artemis/core/paging/impl/Page.java | 99 +++++++++++---------
 1 file changed, 53 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/325f46e0/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
index 077d852..cd6dbd8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
@@ -27,11 +27,12 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import io.netty.buffer.UnpooledUnsafeDirectByteBufWrapper;
 import io.netty.util.internal.PlatformDependent;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
@@ -88,6 +89,9 @@ public final class Page implements Comparable<Page> {
 
    private boolean canBeMapped;
 
+   private final ActiveMQBuffer activeMQBuffer;
+   private final UnpooledUnsafeDirectByteBufWrapper unsafeByteBufWrapper;
+
    public Page(final SimpleString storeName,
                final StorageManager storageManager,
                final SequentialFileFactory factory,
@@ -99,6 +103,9 @@ public final class Page implements Comparable<Page> {
       this.storageManager = storageManager;
       this.storeName = storeName;
       this.canBeMapped = fileFactory instanceof NIOSequentialFileFactory || fileFactory instanceof MappedSequentialFileFactory;
+      //pooled buffers to avoid allocations on hot paths
+      this.unsafeByteBufWrapper = new UnpooledUnsafeDirectByteBufWrapper();
+      this.activeMQBuffer = new ChannelBufferWrapper(this.unsafeByteBufWrapper);
    }
 
    public int getPageId() {
@@ -134,24 +141,24 @@ public final class Page implements Comparable<Page> {
    }
 
    private void readFromSequentialFile(StorageManager storage, List<PagedMessage> messages) throws Exception {
-      // Using direct buffer, as described on https://jira.jboss.org/browse/HORNETQ-467
-      ByteBuffer directBuffer = storage.allocateDirectBuffer((int) file.size());
-      ActiveMQBuffer fileBuffer = null;
+      final int fileSize = (int) file.size();
+      //doesn't need to be a direct buffer: that case is covered using the MMAP read
+      final ByteBuffer buffer = this.fileFactory.newBuffer(fileSize);
       try {
-
          file.position(0);
-         file.read(directBuffer);
-
-         directBuffer.rewind();
-
-         fileBuffer = ActiveMQBuffers.wrappedBuffer(directBuffer);
-         fileBuffer.writerIndex(fileBuffer.capacity());
-         read(storage, fileBuffer, messages);
-      } finally {
-         if (fileBuffer != null) {
-            fileBuffer.byteBuf().unwrap().release();
+         file.read(buffer);
+         buffer.rewind();
+         assert (buffer.limit() == fileSize) : "buffer doesn't contains the whole file";
+         this.unsafeByteBufWrapper.wrap(buffer, 0, fileSize);
+         try {
+            this.activeMQBuffer.clear();
+            this.activeMQBuffer.writerIndex(fileSize);
+            read(storage, this.activeMQBuffer, messages);
+         } finally {
+            this.unsafeByteBufWrapper.reset();
          }
-         storage.freeDirectBuffer(directBuffer);
+      } finally {
+         this.fileFactory.releaseBuffer(buffer);
       }
    }
 
@@ -167,12 +174,15 @@ public final class Page implements Comparable<Page> {
    private int readFromMapped(StorageManager storage, List<PagedMessage> messages) throws IOException {
       file.position(0);
       //use a readonly mapped view of the file
-      final MappedByteBuffer mappedByteBuffer = mapFileForRead(this.file.getJavaFile(), size.get());
+      final int mappedSize = size.get();
+      final MappedByteBuffer mappedByteBuffer = mapFileForRead(this.file.getJavaFile(), mappedSize);
+      this.unsafeByteBufWrapper.wrap(mappedByteBuffer, 0, mappedSize);
       try {
-         final ActiveMQBuffer fileBuffer = ActiveMQBuffers.wrappedBuffer(mappedByteBuffer);
-         fileBuffer.writerIndex(fileBuffer.capacity());
-         return read(storage, fileBuffer, messages);
+         this.activeMQBuffer.clear();
+         this.activeMQBuffer.writerIndex(mappedSize);
+         return read(storage, this.activeMQBuffer, messages);
       } finally {
+         this.unsafeByteBufWrapper.reset();
          //unmap the file after read it to avoid GC to take care of it
          PlatformDependent.freeDirectBuffer(mappedByteBuffer);
       }
@@ -220,35 +230,32 @@ public final class Page implements Comparable<Page> {
 
    public synchronized void write(final PagedMessage message) throws Exception {
       if (!file.isOpen()) {
-
          return;
       }
-
-      ByteBuffer buffer = fileFactory.newBuffer(message.getEncodeSize() + Page.SIZE_RECORD);
-
-      ActiveMQBuffer wrap = ActiveMQBuffers.wrappedBuffer(buffer);
-      wrap.clear();
-
-      wrap.writeByte(Page.START_BYTE);
-      wrap.writeInt(0);
-      int startIndex = wrap.writerIndex();
-      message.encode(wrap);
-      int endIndex = wrap.writerIndex();
-      wrap.setInt(1, endIndex - startIndex); // The encoded length
-      wrap.writeByte(Page.END_BYTE);
-
-      buffer.rewind();
-
-      file.writeDirect(buffer, false);
-
-      if (pageCache != null) {
-         pageCache.addLiveMessage(message);
+      final int messageEncodedSize = message.getEncodeSize();
+      final int bufferSize = messageEncodedSize + Page.SIZE_RECORD;
+      final ByteBuffer buffer = fileFactory.newBuffer(bufferSize);
+      this.unsafeByteBufWrapper.wrap(buffer, 0, bufferSize);
+      try {
+         this.activeMQBuffer.clear();
+         this.activeMQBuffer.writeByte(Page.START_BYTE);
+         this.activeMQBuffer.writeInt(messageEncodedSize);
+         message.encode(this.activeMQBuffer);
+         this.activeMQBuffer.writeByte(Page.END_BYTE);
+         assert (this.activeMQBuffer.readableBytes() == bufferSize) : "messageEncodedSize is different from expected";
+         //buffer limit and position are the same
+         assert (buffer.remaining() == bufferSize) : "buffer position or limit are changed";
+         file.writeDirect(buffer, false);
+         if (pageCache != null) {
+            pageCache.addLiveMessage(message);
+         }
+         //lighter than addAndGet when single writer
+         numberOfMessages.lazySet(numberOfMessages.get() + 1);
+         size.lazySet(size.get() + bufferSize);
+         storageManager.pageWrite(message, pageId);
+      } finally {
+         this.unsafeByteBufWrapper.reset();
       }
-
-      numberOfMessages.incrementAndGet();
-      size.addAndGet(buffer.limit());
-
-      storageManager.pageWrite(message, pageId);
    }
 
    public void sync() throws Exception {