You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/06/23 14:43:51 UTC
[1/2] activemq-artemis git commit: This closes #1361
Repository: activemq-artemis
Updated Branches:
refs/heads/master d6191e31e -> b640a6241
This closes #1361
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b640a624
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b640a624
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b640a624
Branch: refs/heads/master
Commit: b640a624170ae5339892b81ef2c647b8b1596ed4
Parents: d6191e3 325f46e
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Jun 23 09:37:46 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Jun 23 09:37:46 2017 -0400
----------------------------------------------------------------------
.../activemq/artemis/core/paging/impl/Page.java | 99 +++++++++++---------
1 file changed, 53 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
[2/2] activemq-artemis git commit: ARTEMIS-1248 Reduce garbage while
Paging
Posted by cl...@apache.org.
ARTEMIS-1248 Reduce garbage while Paging
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/325f46e0
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/325f46e0
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/325f46e0
Branch: refs/heads/master
Commit: 325f46e01f4c15fb640197d9c7fef3031252d064
Parents: d6191e3
Author: Francesco Nigro <ni...@gmail.com>
Authored: Thu Jun 22 11:37:16 2017 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Jun 23 09:37:46 2017 -0400
----------------------------------------------------------------------
.../activemq/artemis/core/paging/impl/Page.java | 99 +++++++++++---------
1 file changed, 53 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/325f46e0/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
index 077d852..cd6dbd8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
@@ -27,11 +27,12 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import io.netty.buffer.UnpooledUnsafeDirectByteBufWrapper;
import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
@@ -88,6 +89,9 @@ public final class Page implements Comparable<Page> {
private boolean canBeMapped;
+ private final ActiveMQBuffer activeMQBuffer;
+ private final UnpooledUnsafeDirectByteBufWrapper unsafeByteBufWrapper;
+
public Page(final SimpleString storeName,
final StorageManager storageManager,
final SequentialFileFactory factory,
@@ -99,6 +103,9 @@ public final class Page implements Comparable<Page> {
this.storageManager = storageManager;
this.storeName = storeName;
this.canBeMapped = fileFactory instanceof NIOSequentialFileFactory || fileFactory instanceof MappedSequentialFileFactory;
+ //pooled buffers to avoid allocations on hot paths
+ this.unsafeByteBufWrapper = new UnpooledUnsafeDirectByteBufWrapper();
+ this.activeMQBuffer = new ChannelBufferWrapper(this.unsafeByteBufWrapper);
}
public int getPageId() {
@@ -134,24 +141,24 @@ public final class Page implements Comparable<Page> {
}
private void readFromSequentialFile(StorageManager storage, List<PagedMessage> messages) throws Exception {
- // Using direct buffer, as described on https://jira.jboss.org/browse/HORNETQ-467
- ByteBuffer directBuffer = storage.allocateDirectBuffer((int) file.size());
- ActiveMQBuffer fileBuffer = null;
+ final int fileSize = (int) file.size();
+ //doesn't need to be a direct buffer: that case is covered using the MMAP read
+ final ByteBuffer buffer = this.fileFactory.newBuffer(fileSize);
try {
-
file.position(0);
- file.read(directBuffer);
-
- directBuffer.rewind();
-
- fileBuffer = ActiveMQBuffers.wrappedBuffer(directBuffer);
- fileBuffer.writerIndex(fileBuffer.capacity());
- read(storage, fileBuffer, messages);
- } finally {
- if (fileBuffer != null) {
- fileBuffer.byteBuf().unwrap().release();
+ file.read(buffer);
+ buffer.rewind();
+ assert (buffer.limit() == fileSize) : "buffer doesn't contains the whole file";
+ this.unsafeByteBufWrapper.wrap(buffer, 0, fileSize);
+ try {
+ this.activeMQBuffer.clear();
+ this.activeMQBuffer.writerIndex(fileSize);
+ read(storage, this.activeMQBuffer, messages);
+ } finally {
+ this.unsafeByteBufWrapper.reset();
}
- storage.freeDirectBuffer(directBuffer);
+ } finally {
+ this.fileFactory.releaseBuffer(buffer);
}
}
@@ -167,12 +174,15 @@ public final class Page implements Comparable<Page> {
private int readFromMapped(StorageManager storage, List<PagedMessage> messages) throws IOException {
file.position(0);
//use a readonly mapped view of the file
- final MappedByteBuffer mappedByteBuffer = mapFileForRead(this.file.getJavaFile(), size.get());
+ final int mappedSize = size.get();
+ final MappedByteBuffer mappedByteBuffer = mapFileForRead(this.file.getJavaFile(), mappedSize);
+ this.unsafeByteBufWrapper.wrap(mappedByteBuffer, 0, mappedSize);
try {
- final ActiveMQBuffer fileBuffer = ActiveMQBuffers.wrappedBuffer(mappedByteBuffer);
- fileBuffer.writerIndex(fileBuffer.capacity());
- return read(storage, fileBuffer, messages);
+ this.activeMQBuffer.clear();
+ this.activeMQBuffer.writerIndex(mappedSize);
+ return read(storage, this.activeMQBuffer, messages);
} finally {
+ this.unsafeByteBufWrapper.reset();
//unmap the file after read it to avoid GC to take care of it
PlatformDependent.freeDirectBuffer(mappedByteBuffer);
}
@@ -220,35 +230,32 @@ public final class Page implements Comparable<Page> {
public synchronized void write(final PagedMessage message) throws Exception {
if (!file.isOpen()) {
-
return;
}
-
- ByteBuffer buffer = fileFactory.newBuffer(message.getEncodeSize() + Page.SIZE_RECORD);
-
- ActiveMQBuffer wrap = ActiveMQBuffers.wrappedBuffer(buffer);
- wrap.clear();
-
- wrap.writeByte(Page.START_BYTE);
- wrap.writeInt(0);
- int startIndex = wrap.writerIndex();
- message.encode(wrap);
- int endIndex = wrap.writerIndex();
- wrap.setInt(1, endIndex - startIndex); // The encoded length
- wrap.writeByte(Page.END_BYTE);
-
- buffer.rewind();
-
- file.writeDirect(buffer, false);
-
- if (pageCache != null) {
- pageCache.addLiveMessage(message);
+ final int messageEncodedSize = message.getEncodeSize();
+ final int bufferSize = messageEncodedSize + Page.SIZE_RECORD;
+ final ByteBuffer buffer = fileFactory.newBuffer(bufferSize);
+ this.unsafeByteBufWrapper.wrap(buffer, 0, bufferSize);
+ try {
+ this.activeMQBuffer.clear();
+ this.activeMQBuffer.writeByte(Page.START_BYTE);
+ this.activeMQBuffer.writeInt(messageEncodedSize);
+ message.encode(this.activeMQBuffer);
+ this.activeMQBuffer.writeByte(Page.END_BYTE);
+ assert (this.activeMQBuffer.readableBytes() == bufferSize) : "messageEncodedSize is different from expected";
+ //buffer limit and position are the same
+ assert (buffer.remaining() == bufferSize) : "buffer position or limit are changed";
+ file.writeDirect(buffer, false);
+ if (pageCache != null) {
+ pageCache.addLiveMessage(message);
+ }
+ //lighter than addAndGet when single writer
+ numberOfMessages.lazySet(numberOfMessages.get() + 1);
+ size.lazySet(size.get() + bufferSize);
+ storageManager.pageWrite(message, pageId);
+ } finally {
+ this.unsafeByteBufWrapper.reset();
}
-
- numberOfMessages.incrementAndGet();
- size.addAndGet(buffer.limit());
-
- storageManager.pageWrite(message, pageId);
}
public void sync() throws Exception {