You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/05/13 13:47:44 UTC
[activemq-artemis] branch master updated: ARTEMIS-2317 Reuse file
buffer wrapper instances to reduce allocations
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 79465f7 ARTEMIS-2317 Reuse file buffer wrapper instances to reduce allocations
new 2b3341d This closes #2646
79465f7 is described below
commit 79465f7f88e32ca477655bb3f7fc6d261511ebae
Author: Francesco Nigro <ni...@gmail.com>
AuthorDate: Fri Apr 26 12:12:31 2019 +0200
ARTEMIS-2317 Reuse file buffer wrapper instances to reduce allocations
Page::read is allocating a new ChannelBufferWrapper on each
paged message read: to reduce the allocation rate, it could be
reused until a new wrapped ByteBuffer is created
---
.../activemq/artemis/core/paging/impl/Page.java | 49 ++++++++++++++++------
1 file changed, 37 insertions(+), 12 deletions(-)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
index 03d0e67..96ed2ba 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
@@ -22,9 +22,8 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
@@ -119,12 +118,6 @@ public final class Page implements Comparable<Page> {
return messages;
}
- private static void decodeInto(ByteBuffer fileBuffer, int encodedSize, PagedMessageImpl msg) {
- final ActiveMQBuffer wrappedBuffer = ActiveMQBuffers.wrappedBuffer(fileBuffer);
- wrappedBuffer.writerIndex(encodedSize);
- msg.decode(wrappedBuffer);
- }
-
private ByteBuffer allocateAndReadIntoFileBuffer(ByteBuffer fileBuffer, int requiredBytes) throws Exception {
final ByteBuffer newFileBuffer = fileFactory.newBuffer(Math.max(requiredBytes, MIN_CHUNK_SIZE));
newFileBuffer.put(fileBuffer);
@@ -171,6 +164,24 @@ public final class Page implements Comparable<Page> {
return fileBuffer;
}
+ private static ChannelBufferWrapper wrapWhole(ByteBuffer fileBuffer) {
+ final int position = fileBuffer.position();
+ final int limit = fileBuffer.limit();
+ final int capacity = fileBuffer.capacity();
+ try {
+ fileBuffer.clear();
+ final ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(fileBuffer);
+ //this check is important to avoid next ByteBuf::setIndex
+ //to fail due to ByteBuf::capacity == ByteBuffer::remaining bytes
+ assert wrappedBuffer.readableBytes() == capacity;
+ final ChannelBufferWrapper fileBufferWrapper = new ChannelBufferWrapper(wrappedBuffer);
+ return fileBufferWrapper;
+ } finally {
+ fileBuffer.position(position);
+ fileBuffer.limit(limit);
+ }
+ }
+
//sizeOf(START_BYTE) + sizeOf(MESSAGE LENGTH) + sizeOf(END_BYTE)
private static final int HEADER_AND_TRAILER_SIZE = DataConstants.SIZE_INT + 2;
private static final int MINIMUM_MSG_PERSISTENT_SIZE = HEADER_AND_TRAILER_SIZE;
@@ -182,24 +193,39 @@ public final class Page implements Comparable<Page> {
file.position(0);
int processedBytes = 0;
ByteBuffer fileBuffer = null;
+ ChannelBufferWrapper fileBufferWrapper;
try {
int remainingBytes = fileSize - processedBytes;
if (remainingBytes >= MINIMUM_MSG_PERSISTENT_SIZE) {
fileBuffer = fileFactory.newBuffer(Math.min(remainingBytes, MIN_CHUNK_SIZE));
+ //the wrapper is reused to avoid unnecessary allocations
+ fileBufferWrapper = wrapWhole(fileBuffer);
+ //no content is being added yet
fileBuffer.limit(0);
do {
+ final ByteBuffer oldFileBuffer = fileBuffer;
fileBuffer = readIntoFileBufferIfNecessary(fileBuffer, MINIMUM_MSG_PERSISTENT_SIZE);
+ //change wrapper if fileBuffer has changed
+ if (fileBuffer != oldFileBuffer) {
+ fileBufferWrapper = wrapWhole(fileBuffer);
+ }
final byte startByte = fileBuffer.get();
if (startByte == Page.START_BYTE) {
final int encodedSize = fileBuffer.getInt();
final int nextPosition = processedBytes + HEADER_AND_TRAILER_SIZE + encodedSize;
if (nextPosition <= fileSize) {
+ final ByteBuffer currentFileBuffer = fileBuffer;
fileBuffer = readIntoFileBufferIfNecessary(fileBuffer, encodedSize + 1);
+ //change wrapper if fileBuffer has changed
+ if (fileBuffer != currentFileBuffer) {
+ fileBufferWrapper = wrapWhole(fileBuffer);
+ }
final int endPosition = fileBuffer.position() + encodedSize;
//this check must be performed upfront decoding
if (fileBuffer.remaining() >= (encodedSize + 1) && fileBuffer.get(endPosition) == Page.END_BYTE) {
final PagedMessageImpl msg = new PagedMessageImpl(storageManager);
- decodeInto(fileBuffer, encodedSize, msg);
+ fileBufferWrapper.setIndex(fileBuffer.position(), endPosition);
+ msg.decode(fileBufferWrapper);
fileBuffer.position(endPosition + 1);
assert fileBuffer.get(endPosition) == Page.END_BYTE : "decoding cannot change end byte";
msg.initMessage(storage);
@@ -415,8 +441,7 @@ public final class Page implements Comparable<Page> {
* @param pageSubscriptionCounter
*/
public void addPendingCounter(PageSubscriptionCounter pageSubscriptionCounter) {
- Set<PageSubscriptionCounter> counter = getOrCreatePendingCounters();
- pendingCounters.add(pageSubscriptionCounter);
+ getOrCreatePendingCounters().add(pageSubscriptionCounter);
}
private synchronized Set<PageSubscriptionCounter> getPendingCounters() {
@@ -430,4 +455,4 @@ public final class Page implements Comparable<Page> {
return pendingCounters;
}
-}
+}
\ No newline at end of file