You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/09/13 01:02:57 UTC
activemq-artemis git commit: ARTEMIS-2070 broker can reduce buffer
copies with large messages
Repository: activemq-artemis
Updated Branches:
refs/heads/2.6.x 124fd28cf -> e002dfb3a
ARTEMIS-2070 broker can reduce buffer copies with large messages
(cherry picked from commit 7e09e1b3502361eb0ff51fe56af904922cf7e421)
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e002dfb3
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e002dfb3
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e002dfb3
Branch: refs/heads/2.6.x
Commit: e002dfb3acbb4fcdc74f1b87c1b05b2e7980a788
Parents: 124fd28
Author: Francesco Nigro <ni...@gmail.com>
Authored: Fri Aug 31 20:32:48 2018 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Sep 12 21:02:51 2018 -0400
----------------------------------------------------------------------
.../commands/tools/xml/XMLMessageExporter.java | 27 ++++++---
.../core/client/impl/ClientMessageImpl.java | 17 ++----
.../core/client/impl/ClientProducerImpl.java | 11 ++--
.../artemis/core/message/LargeBodyEncoder.java | 6 --
.../artemis/core/message/impl/CoreMessage.java | 25 ++++----
.../impl/journal/JournalStorageManager.java | 26 +++++++++
.../impl/journal/LargeServerMessageImpl.java | 37 ++++++------
.../nullpm/NullStorageLargeServerMessage.java | 20 +++++++
.../artemis/core/server/LargeServerMessage.java | 3 +
.../core/server/impl/ServerConsumerImpl.java | 60 ++++++++++++++------
.../core/server/impl/ServerSessionImpl.java | 8 +--
11 files changed, 151 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e002dfb3/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageExporter.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageExporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageExporter.java
index a2fbadd..f8063d1 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageExporter.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageExporter.java
@@ -18,10 +18,9 @@ package org.apache.activemq.artemis.cli.commands.tools.xml;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamWriter;
+import java.nio.ByteBuffer;
import java.util.List;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
@@ -34,7 +33,7 @@ import org.apache.activemq.artemis.reader.TextMessageUtil;
/** This is an Utility class that will import the outputs in XML format. */
public class XMLMessageExporter {
- private static final Long LARGE_MESSAGE_CHUNK_SIZE = 1000L;
+ private static final int LARGE_MESSAGE_CHUNK_SIZE = 1000;
private XMLStreamWriter xmlWriter;
@@ -70,6 +69,15 @@ public class XMLMessageExporter {
xmlWriter.writeEndElement(); // end MESSAGE_BODY
}
+ private static ByteBuffer acquireHeapBodyBuffer(ByteBuffer chunkBytes, int requiredCapacity) {
+ if (chunkBytes == null || chunkBytes.capacity() != requiredCapacity) {
+ chunkBytes = ByteBuffer.allocate(requiredCapacity);
+ } else {
+ chunkBytes.clear();
+ }
+ return chunkBytes;
+ }
+
public void printLargeMessageBody(LargeServerMessage message) throws XMLStreamException {
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString());
LargeBodyEncoder encoder = null;
@@ -78,18 +86,19 @@ public class XMLMessageExporter {
encoder = message.toCore().getBodyEncoder();
encoder.open();
long totalBytesWritten = 0;
- Long bufferSize;
+ int bufferSize;
long bodySize = encoder.getLargeBodySize();
+ ByteBuffer buffer = null;
for (long i = 0; i < bodySize; i += LARGE_MESSAGE_CHUNK_SIZE) {
- Long remainder = bodySize - totalBytesWritten;
+ long remainder = bodySize - totalBytesWritten;
if (remainder >= LARGE_MESSAGE_CHUNK_SIZE) {
bufferSize = LARGE_MESSAGE_CHUNK_SIZE;
} else {
- bufferSize = remainder;
+ bufferSize = (int) remainder;
}
- ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(bufferSize.intValue());
- encoder.encode(buffer, bufferSize.intValue());
- xmlWriter.writeCData(XmlDataExporterUtil.encode(buffer.toByteBuffer().array()));
+ buffer = acquireHeapBodyBuffer(buffer, bufferSize);
+ encoder.encode(buffer);
+ xmlWriter.writeCData(XmlDataExporterUtil.encode(buffer.array()));
totalBytesWritten += bufferSize;
}
encoder.close();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e002dfb3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
index d14c64e..52ceb99 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
@@ -21,8 +21,6 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.Message;
@@ -400,17 +398,10 @@ public class ClientMessageImpl extends CoreMessage implements ClientMessageInter
}
@Override
- public int encode(final ByteBuffer bufferRead) throws ActiveMQException {
- ActiveMQBuffer buffer1 = ActiveMQBuffers.wrappedBuffer(bufferRead);
- return encode(buffer1, bufferRead.capacity());
- }
-
- @Override
- public int encode(final ActiveMQBuffer bufferOut, final int size) {
- byte[] bytes = new byte[size];
- buffer.readBytes(bytes);
- bufferOut.writeBytes(bytes, 0, size);
- return size;
+ public int encode(final ByteBuffer bufferRead) {
+ final int remaining = bufferRead.remaining();
+ buffer.readBytes(bufferRead);
+ return remaining;
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e002dfb3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
index 0ad1999..5cda0c4 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
@@ -18,10 +18,9 @@ package org.apache.activemq.artemis.core.client.impl;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
@@ -381,16 +380,18 @@ public class ClientProducerImpl implements ClientProducerInternal {
final int chunkLength = (int) Math.min((bodySize - pos), minLargeMessageSize);
- final ActiveMQBuffer bodyBuffer = ActiveMQBuffers.fixedBuffer(chunkLength);
+ final ByteBuffer bodyBuffer = ByteBuffer.allocate(chunkLength);
- context.encode(bodyBuffer, chunkLength);
+ final int encodedSize = context.encode(bodyBuffer);
+
+ assert encodedSize == chunkLength;
pos += chunkLength;
lastChunk = pos >= bodySize;
SendAcknowledgementHandler messageHandler = lastChunk ? handler : null;
- int creditsUsed = sessionContext.sendServerLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.toByteBuffer().array(), messageHandler);
+ int creditsUsed = sessionContext.sendServerLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.array(), messageHandler);
credits.acquireCredits(creditsUsed);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e002dfb3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java
index 7a248b4..87e5ba6 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java
@@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.message;
import java.nio.ByteBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
/**
@@ -46,10 +45,5 @@ public interface LargeBodyEncoder {
/**
* This method must not be called directly by ActiveMQ Artemis clients.
*/
- int encode(ActiveMQBuffer bufferOut, int size) throws ActiveMQException;
-
- /**
- * This method must not be called directly by ActiveMQ Artemis clients.
- */
long getLargeBodySize() throws ActiveMQException;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e002dfb3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 5272200..cc79c2c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -254,14 +254,14 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
}
private ActiveMQBuffer getLargeMessageBuffer() throws ActiveMQException {
- ActiveMQBuffer buffer;
LargeBodyEncoder encoder = getBodyEncoder();
encoder.open();
int bodySize = (int) encoder.getLargeBodySize();
-
- buffer = new ChannelBufferWrapper(UnpooledByteBufAllocator.DEFAULT.heapBuffer(bodySize));
-
- encoder.encode(buffer, bodySize);
+ final ActiveMQBuffer buffer = new ChannelBufferWrapper(UnpooledByteBufAllocator.DEFAULT.heapBuffer(bodySize));
+ buffer.byteBuf().ensureWritable(bodySize);
+ final ByteBuffer nioBuffer = buffer.byteBuf().internalNioBuffer(0, bodySize);
+ encoder.encode(nioBuffer);
+ buffer.writerIndex(bodySize);
encoder.close();
return buffer;
}
@@ -1154,16 +1154,11 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
}
@Override
- public int encode(final ByteBuffer bufferRead) throws ActiveMQException {
- ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(bufferRead);
- return encode(buffer, bufferRead.capacity());
- }
-
- @Override
- public int encode(final ActiveMQBuffer bufferOut, final int size) {
- bufferOut.byteBuf().writeBytes(buffer, lastPos, size);
- lastPos += size;
- return size;
+ public int encode(final ByteBuffer bufferRead) {
+ final int remaining = bufferRead.remaining();
+ buffer.getBytes(lastPos, bufferRead);
+ lastPos += remaining;
+ return remaining;
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e002dfb3/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 867f2d4..1d954c8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -783,6 +783,32 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
}
}
+ public final void addBytesToLargeMessage(final SequentialFile file,
+ final long messageId,
+ final ActiveMQBuffer bytes) throws Exception {
+ readLock();
+ try {
+ file.position(file.size());
+ if (bytes.byteBuf() != null && bytes.byteBuf().nioBufferCount() == 1) {
+ final ByteBuffer nioBytes = bytes.byteBuf().internalNioBuffer(bytes.readerIndex(), bytes.readableBytes());
+ file.writeDirect(nioBytes, false);
+
+ if (isReplicated()) {
+ //copy defensively bytes
+ final byte[] bytesCopy = new byte[bytes.readableBytes()];
+ bytes.getBytes(bytes.readerIndex(), bytesCopy);
+ replicator.largeMessageWrite(messageId, bytesCopy);
+ }
+ } else {
+ final byte[] bytesCopy = new byte[bytes.readableBytes()];
+ bytes.readBytes(bytesCopy);
+ addBytesToLargeMessage(file, messageId, bytesCopy);
+ }
+ } finally {
+ readUnLock();
+ }
+ }
+
@Override
public final void addBytesToLargeMessage(final SequentialFile file,
final long messageId,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e002dfb3/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index 257141e..110070c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -53,10 +53,6 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
// We should only use the NIO implementation on the Journal
private SequentialFile file;
- // set when a copyFrom is called
- // The actual copy is done when finishCopy is called
- private SequentialFile pendingCopy;
-
private long bodySize = -1;
private final AtomicInteger delayDeletionCount = new AtomicInteger(0);
@@ -132,6 +128,21 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
}
@Override
+ public synchronized void addBytes(final ActiveMQBuffer bytes) throws Exception {
+ validateFile();
+
+ if (!file.isOpen()) {
+ file.open();
+ }
+
+ final int readableBytes = bytes.readableBytes();
+
+ storageManager.addBytesToLargeMessage(file, getMessageID(), bytes);
+
+ bodySize += readableBytes;
+ }
+
+ @Override
public synchronized int getEncodeSize() {
return getHeadersAndPropertiesEncodeSize();
}
@@ -488,22 +499,6 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
}
}
- @Override
- public int encode(final ActiveMQBuffer bufferOut, final int size) throws ActiveMQException {
- // This could maybe be optimized (maybe reading directly into bufferOut)
- ByteBuffer bufferRead = ByteBuffer.allocate(size);
-
- int bytesRead = encode(bufferRead);
-
- bufferRead.flip();
-
- if (bytesRead > 0) {
- bufferOut.writeBytes(bufferRead.array(), 0, bytesRead);
- }
-
- return bytesRead;
- }
-
/* (non-Javadoc)
* @see org.apache.activemq.artemis.core.message.LargeBodyEncoder#getLargeBodySize()
*/
@@ -512,4 +507,6 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
return getBodySize();
}
}
+
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e002dfb3/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
index 6280746..d709e28 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
@@ -49,11 +49,31 @@ class NullStorageLargeServerMessage extends CoreMessage implements LargeServerMe
}
@Override
+ public synchronized void addBytes(ActiveMQBuffer bytes) {
+ final int readableBytes = bytes.readableBytes();
+ if (buffer == null) {
+ buffer = Unpooled.buffer(readableBytes);
+ }
+
+ // expand the buffer
+ buffer.ensureWritable(readableBytes);
+ assert buffer.hasArray();
+ final int writerIndex = buffer.writerIndex();
+ bytes.readBytes(buffer.array(), buffer.arrayOffset() + writerIndex, readableBytes);
+ buffer.writerIndex(writerIndex + readableBytes);
+ }
+
+ @Override
public synchronized ActiveMQBuffer getReadOnlyBodyBuffer() {
return new ChannelBufferWrapper(buffer.slice(0, buffer.writerIndex()).asReadOnly());
}
@Override
+ public synchronized int getBodyBufferSize() {
+ return buffer.writerIndex();
+ }
+
+ @Override
public void deleteFile() throws Exception {
// nothing to be done here.. we don really have a file on this Storage
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e002dfb3/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
index a80e369..69592bb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.server;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.core.io.SequentialFile;
@@ -28,6 +29,8 @@ public interface LargeServerMessage extends ReplicatedLargeMessage, ICoreMessage
@Override
void addBytes(byte[] bytes) throws Exception;
+ void addBytes(ActiveMQBuffer bytes) throws Exception;
+
/**
* We have to copy the large message content in case of DLQ and paged messages
* For that we need to pre-mark the LargeMessage with a flag when it is paged
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e002dfb3/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 22bfdaf..ad8e668 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.server.impl;
import java.math.BigDecimal;
+import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -27,8 +28,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.Message;
@@ -1182,12 +1181,29 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
private LargeBodyEncoder context;
+ private ByteBuffer chunkBytes;
+
private LargeMessageDeliverer(final LargeServerMessage message, final MessageReference ref) throws Exception {
largeMessage = message;
largeMessage.incrementDelayDeletionCount();
this.ref = ref;
+
+ this.chunkBytes = null;
+ }
+
+ private ByteBuffer acquireHeapBodyBuffer(int requiredCapacity) {
+ if (this.chunkBytes == null || this.chunkBytes.capacity() != requiredCapacity) {
+ this.chunkBytes = ByteBuffer.allocate(requiredCapacity);
+ } else {
+ this.chunkBytes.clear();
+ }
+ return this.chunkBytes;
+ }
+
+ private void releaseHeapBodyBuffer() {
+ this.chunkBytes = null;
}
public boolean deliver() throws Exception {
@@ -1207,7 +1223,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
logger.trace(this + "::FlowControl::delivery largeMessage interrupting as there are no more credits, available=" +
availableCredits);
}
-
+ releaseHeapBodyBuffer();
return false;
}
@@ -1223,7 +1239,11 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
int packetSize = callback.sendLargeMessage(ref, currentLargeMessage, ServerConsumerImpl.this, context.getLargeBodySize(), ref.getDeliveryCount());
if (availableCredits != null) {
- availableCredits.addAndGet(-packetSize);
+ final int credits = availableCredits.addAndGet(-packetSize);
+
+ if (credits <= 0) {
+ releaseHeapBodyBuffer();
+ }
if (logger.isTraceEnabled()) {
logger.trace(this + "::FlowControl::" +
@@ -1246,32 +1266,38 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
logger.trace(this + "::FlowControl::deliverLargeMessage Leaving loop of send LargeMessage because of credits, available=" +
availableCredits);
}
-
+ releaseHeapBodyBuffer();
return false;
}
- int localChunkLen = 0;
+ final int localChunkLen = (int) Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize);
- localChunkLen = (int) Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize);
+ final ByteBuffer bodyBuffer = acquireHeapBodyBuffer(localChunkLen);
- ActiveMQBuffer bodyBuffer = ActiveMQBuffers.fixedBuffer(localChunkLen);
+ assert bodyBuffer.remaining() == localChunkLen;
- context.encode(bodyBuffer, localChunkLen);
+ final int readBytes = context.encode(bodyBuffer);
- byte[] body;
+ assert readBytes == localChunkLen;
- if (bodyBuffer.toByteBuffer().hasArray()) {
- body = bodyBuffer.toByteBuffer().array();
- } else {
- body = new byte[0];
- }
+ final byte[] body = bodyBuffer.array();
+
+ assert body.length == readBytes;
+
+ //It is possible to recycle the same heap body buffer because it won't be cached by sendLargeMessageContinuation
+ //given that requiresResponse is false: ChannelImpl::send will use the resend cache only if
+ //resendCache != null && packet.isRequiresConfirmations()
int packetSize = callback.sendLargeMessageContinuation(ServerConsumerImpl.this, body, positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage, false);
int chunkLen = body.length;
if (availableCredits != null) {
- availableCredits.addAndGet(-packetSize);
+ final int credits = availableCredits.addAndGet(-packetSize);
+
+ if (credits <= 0) {
+ releaseHeapBodyBuffer();
+ }
if (logger.isTraceEnabled()) {
logger.trace(this + "::FlowControl::largeMessage deliver continuation, packetSize=" +
@@ -1304,6 +1330,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
public void finish() throws Exception {
synchronized (lock) {
+ releaseHeapBodyBuffer();
+
if (largeMessage == null) {
// handleClose could be calling close while handle is also calling finish.
// As a result one of them could get here after the largeMessage is already gone.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e002dfb3/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index bd36659..2e3e2f9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -1398,13 +1398,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
private LargeServerMessage messageToLargeMessage(Message message) throws Exception {
ICoreMessage coreMessage = message.toCore();
LargeServerMessage lsm = getStorageManager().createLargeMessage(storageManager.generateID(), coreMessage);
-
ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer();
- byte[] body = new byte[buffer.readableBytes()];
- buffer.readBytes(body);
- lsm.addBytes(body);
+ final int readableBytes = buffer.readableBytes();
+ lsm.addBytes(buffer);
lsm.releaseResources();
- lsm.putLongProperty(Message.HDR_LARGE_BODY_SIZE, body.length);
+ lsm.putLongProperty(Message.HDR_LARGE_BODY_SIZE, readableBytes);
return lsm;
}