You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/09/13 01:02:57 UTC

activemq-artemis git commit: ARTEMIS-2070 broker can reduce buffer copies with large messages

Repository: activemq-artemis
Updated Branches:
  refs/heads/2.6.x 124fd28cf -> e002dfb3a


ARTEMIS-2070 broker can reduce buffer copies with large messages

(cherry picked from commit 7e09e1b3502361eb0ff51fe56af904922cf7e421)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e002dfb3
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e002dfb3
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e002dfb3

Branch: refs/heads/2.6.x
Commit: e002dfb3acbb4fcdc74f1b87c1b05b2e7980a788
Parents: 124fd28
Author: Francesco Nigro <ni...@gmail.com>
Authored: Fri Aug 31 20:32:48 2018 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Sep 12 21:02:51 2018 -0400

----------------------------------------------------------------------
 .../commands/tools/xml/XMLMessageExporter.java  | 27 ++++++---
 .../core/client/impl/ClientMessageImpl.java     | 17 ++----
 .../core/client/impl/ClientProducerImpl.java    | 11 ++--
 .../artemis/core/message/LargeBodyEncoder.java  |  6 --
 .../artemis/core/message/impl/CoreMessage.java  | 25 ++++----
 .../impl/journal/JournalStorageManager.java     | 26 +++++++++
 .../impl/journal/LargeServerMessageImpl.java    | 37 ++++++------
 .../nullpm/NullStorageLargeServerMessage.java   | 20 +++++++
 .../artemis/core/server/LargeServerMessage.java |  3 +
 .../core/server/impl/ServerConsumerImpl.java    | 60 ++++++++++++++------
 .../core/server/impl/ServerSessionImpl.java     |  8 +--
 11 files changed, 151 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e002dfb3/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageExporter.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageExporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageExporter.java
index a2fbadd..f8063d1 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageExporter.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageExporter.java
@@ -18,10 +18,9 @@ package org.apache.activemq.artemis.cli.commands.tools.xml;
 
 import javax.xml.stream.XMLStreamException;
 import javax.xml.stream.XMLStreamWriter;
+import java.nio.ByteBuffer;
 import java.util.List;
 
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
@@ -34,7 +33,7 @@ import org.apache.activemq.artemis.reader.TextMessageUtil;
 /** This is an Utility class that will import the outputs in XML format. */
 public class XMLMessageExporter {
 
-   private static final Long LARGE_MESSAGE_CHUNK_SIZE = 1000L;
+   private static final int LARGE_MESSAGE_CHUNK_SIZE = 1000;
 
    private XMLStreamWriter xmlWriter;
 
@@ -70,6 +69,15 @@ public class XMLMessageExporter {
       xmlWriter.writeEndElement(); // end MESSAGE_BODY
    }
 
+   private static ByteBuffer acquireHeapBodyBuffer(ByteBuffer chunkBytes, int requiredCapacity) {
+      if (chunkBytes == null || chunkBytes.capacity() != requiredCapacity) {
+         chunkBytes = ByteBuffer.allocate(requiredCapacity);
+      } else {
+         chunkBytes.clear();
+      }
+      return chunkBytes;
+   }
+
    public void printLargeMessageBody(LargeServerMessage message) throws XMLStreamException {
       xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString());
       LargeBodyEncoder encoder = null;
@@ -78,18 +86,19 @@ public class XMLMessageExporter {
          encoder = message.toCore().getBodyEncoder();
          encoder.open();
          long totalBytesWritten = 0;
-         Long bufferSize;
+         int bufferSize;
          long bodySize = encoder.getLargeBodySize();
+         ByteBuffer buffer = null;
          for (long i = 0; i < bodySize; i += LARGE_MESSAGE_CHUNK_SIZE) {
-            Long remainder = bodySize - totalBytesWritten;
+            long remainder = bodySize - totalBytesWritten;
             if (remainder >= LARGE_MESSAGE_CHUNK_SIZE) {
                bufferSize = LARGE_MESSAGE_CHUNK_SIZE;
             } else {
-               bufferSize = remainder;
+               bufferSize = (int) remainder;
             }
-            ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(bufferSize.intValue());
-            encoder.encode(buffer, bufferSize.intValue());
-            xmlWriter.writeCData(XmlDataExporterUtil.encode(buffer.toByteBuffer().array()));
+            buffer = acquireHeapBodyBuffer(buffer, bufferSize);
+            encoder.encode(buffer);
+            xmlWriter.writeCData(XmlDataExporterUtil.encode(buffer.array()));
             totalBytesWritten += bufferSize;
          }
          encoder.close();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e002dfb3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
index d14c64e..52ceb99 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
@@ -21,8 +21,6 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.Message;
@@ -400,17 +398,10 @@ public class ClientMessageImpl extends CoreMessage implements ClientMessageInter
       }
 
       @Override
-      public int encode(final ByteBuffer bufferRead) throws ActiveMQException {
-         ActiveMQBuffer buffer1 = ActiveMQBuffers.wrappedBuffer(bufferRead);
-         return encode(buffer1, bufferRead.capacity());
-      }
-
-      @Override
-      public int encode(final ActiveMQBuffer bufferOut, final int size) {
-         byte[] bytes = new byte[size];
-         buffer.readBytes(bytes);
-         bufferOut.writeBytes(bytes, 0, size);
-         return size;
+      public int encode(final ByteBuffer bufferRead) {
+         final int remaining = bufferRead.remaining();
+         buffer.readBytes(bufferRead);
+         return remaining;
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e002dfb3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
index 0ad1999..5cda0c4 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
@@ -18,10 +18,9 @@ package org.apache.activemq.artemis.core.client.impl;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
@@ -381,16 +380,18 @@ public class ClientProducerImpl implements ClientProducerInternal {
 
             final int chunkLength = (int) Math.min((bodySize - pos), minLargeMessageSize);
 
-            final ActiveMQBuffer bodyBuffer = ActiveMQBuffers.fixedBuffer(chunkLength);
+            final ByteBuffer bodyBuffer = ByteBuffer.allocate(chunkLength);
 
-            context.encode(bodyBuffer, chunkLength);
+            final int encodedSize = context.encode(bodyBuffer);
+
+            assert encodedSize == chunkLength;
 
             pos += chunkLength;
 
             lastChunk = pos >= bodySize;
             SendAcknowledgementHandler messageHandler = lastChunk ? handler : null;
 
-            int creditsUsed = sessionContext.sendServerLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.toByteBuffer().array(), messageHandler);
+            int creditsUsed = sessionContext.sendServerLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.array(), messageHandler);
 
             credits.acquireCredits(creditsUsed);
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e002dfb3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java
index 7a248b4..87e5ba6 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java
@@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.message;
 
 import java.nio.ByteBuffer;
 
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 
 /**
@@ -46,10 +45,5 @@ public interface LargeBodyEncoder {
    /**
     * This method must not be called directly by ActiveMQ Artemis clients.
     */
-   int encode(ActiveMQBuffer bufferOut, int size) throws ActiveMQException;
-
-   /**
-    * This method must not be called directly by ActiveMQ Artemis clients.
-    */
    long getLargeBodySize() throws ActiveMQException;
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e002dfb3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 5272200..cc79c2c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -254,14 +254,14 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    }
 
    private ActiveMQBuffer getLargeMessageBuffer() throws ActiveMQException {
-      ActiveMQBuffer buffer;
       LargeBodyEncoder encoder = getBodyEncoder();
       encoder.open();
       int bodySize = (int) encoder.getLargeBodySize();
-
-      buffer = new ChannelBufferWrapper(UnpooledByteBufAllocator.DEFAULT.heapBuffer(bodySize));
-
-      encoder.encode(buffer, bodySize);
+      final ActiveMQBuffer buffer = new ChannelBufferWrapper(UnpooledByteBufAllocator.DEFAULT.heapBuffer(bodySize));
+      buffer.byteBuf().ensureWritable(bodySize);
+      final ByteBuffer nioBuffer = buffer.byteBuf().internalNioBuffer(0, bodySize);
+      encoder.encode(nioBuffer);
+      buffer.writerIndex(bodySize);
       encoder.close();
       return buffer;
    }
@@ -1154,16 +1154,11 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
       }
 
       @Override
-      public int encode(final ByteBuffer bufferRead) throws ActiveMQException {
-         ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(bufferRead);
-         return encode(buffer, bufferRead.capacity());
-      }
-
-      @Override
-      public int encode(final ActiveMQBuffer bufferOut, final int size) {
-         bufferOut.byteBuf().writeBytes(buffer, lastPos, size);
-         lastPos += size;
-         return size;
+      public int encode(final ByteBuffer bufferRead) {
+         final int remaining = bufferRead.remaining();
+         buffer.getBytes(lastPos, bufferRead);
+         lastPos += remaining;
+         return remaining;
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e002dfb3/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 867f2d4..1d954c8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -783,6 +783,32 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
       }
    }
 
+   public final void addBytesToLargeMessage(final SequentialFile file,
+                                            final long messageId,
+                                            final ActiveMQBuffer bytes) throws Exception {
+      readLock();
+      try {
+         file.position(file.size());
+         if (bytes.byteBuf() != null && bytes.byteBuf().nioBufferCount() == 1) {
+            final ByteBuffer nioBytes = bytes.byteBuf().internalNioBuffer(bytes.readerIndex(), bytes.readableBytes());
+            file.writeDirect(nioBytes, false);
+
+            if (isReplicated()) {
+               //copy defensively bytes
+               final byte[] bytesCopy = new byte[bytes.readableBytes()];
+               bytes.getBytes(bytes.readerIndex(), bytesCopy);
+               replicator.largeMessageWrite(messageId, bytesCopy);
+            }
+         } else {
+            final byte[] bytesCopy = new byte[bytes.readableBytes()];
+            bytes.readBytes(bytesCopy);
+            addBytesToLargeMessage(file, messageId, bytesCopy);
+         }
+      } finally {
+         readUnLock();
+      }
+   }
+
    @Override
    public final void addBytesToLargeMessage(final SequentialFile file,
                                             final long messageId,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e002dfb3/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index 257141e..110070c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -53,10 +53,6 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
    // We should only use the NIO implementation on the Journal
    private SequentialFile file;
 
-   // set when a copyFrom is called
-   // The actual copy is done when finishCopy is called
-   private SequentialFile pendingCopy;
-
    private long bodySize = -1;
 
    private final AtomicInteger delayDeletionCount = new AtomicInteger(0);
@@ -132,6 +128,21 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
    }
 
    @Override
+   public synchronized void addBytes(final ActiveMQBuffer bytes) throws Exception {
+      validateFile();
+
+      if (!file.isOpen()) {
+         file.open();
+      }
+
+      final int readableBytes = bytes.readableBytes();
+
+      storageManager.addBytesToLargeMessage(file, getMessageID(), bytes);
+
+      bodySize += readableBytes;
+   }
+
+   @Override
    public synchronized int getEncodeSize() {
       return getHeadersAndPropertiesEncodeSize();
    }
@@ -488,22 +499,6 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
          }
       }
 
-      @Override
-      public int encode(final ActiveMQBuffer bufferOut, final int size) throws ActiveMQException {
-         // This could maybe be optimized (maybe reading directly into bufferOut)
-         ByteBuffer bufferRead = ByteBuffer.allocate(size);
-
-         int bytesRead = encode(bufferRead);
-
-         bufferRead.flip();
-
-         if (bytesRead > 0) {
-            bufferOut.writeBytes(bufferRead.array(), 0, bytesRead);
-         }
-
-         return bytesRead;
-      }
-
       /* (non-Javadoc)
        * @see org.apache.activemq.artemis.core.message.LargeBodyEncoder#getLargeBodySize()
        */
@@ -512,4 +507,6 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
          return getBodySize();
       }
    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e002dfb3/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
index 6280746..d709e28 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
@@ -49,11 +49,31 @@ class NullStorageLargeServerMessage extends CoreMessage implements LargeServerMe
    }
 
    @Override
+   public synchronized void addBytes(ActiveMQBuffer bytes) {
+      final int readableBytes = bytes.readableBytes();
+      if (buffer == null) {
+         buffer = Unpooled.buffer(readableBytes);
+      }
+
+      // expand the buffer
+      buffer.ensureWritable(readableBytes);
+      assert buffer.hasArray();
+      final int writerIndex = buffer.writerIndex();
+      bytes.readBytes(buffer.array(), buffer.arrayOffset() + writerIndex, readableBytes);
+      buffer.writerIndex(writerIndex + readableBytes);
+   }
+
+   @Override
    public synchronized ActiveMQBuffer getReadOnlyBodyBuffer() {
       return new ChannelBufferWrapper(buffer.slice(0, buffer.writerIndex()).asReadOnly());
    }
 
    @Override
+   public synchronized int getBodyBufferSize() {
+      return buffer.writerIndex();
+   }
+
+   @Override
    public void deleteFile() throws Exception {
       // nothing to be done here.. we don really have a file on this Storage
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e002dfb3/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
index a80e369..69592bb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.server;
 
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.core.io.SequentialFile;
@@ -28,6 +29,8 @@ public interface LargeServerMessage extends ReplicatedLargeMessage, ICoreMessage
    @Override
    void addBytes(byte[] bytes) throws Exception;
 
+   void addBytes(ActiveMQBuffer bytes) throws Exception;
+
    /**
     * We have to copy the large message content in case of DLQ and paged messages
     * For that we need to pre-mark the LargeMessage with a flag when it is paged

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e002dfb3/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 22bfdaf..ad8e668 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.core.server.impl;
 
 import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -27,8 +28,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
 import org.apache.activemq.artemis.api.core.Message;
@@ -1182,12 +1181,29 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
       private LargeBodyEncoder context;
 
+      private ByteBuffer chunkBytes;
+
       private LargeMessageDeliverer(final LargeServerMessage message, final MessageReference ref) throws Exception {
          largeMessage = message;
 
          largeMessage.incrementDelayDeletionCount();
 
          this.ref = ref;
+
+         this.chunkBytes = null;
+      }
+
+      private ByteBuffer acquireHeapBodyBuffer(int requiredCapacity) {
+         if (this.chunkBytes == null || this.chunkBytes.capacity() != requiredCapacity) {
+            this.chunkBytes = ByteBuffer.allocate(requiredCapacity);
+         } else {
+            this.chunkBytes.clear();
+         }
+         return this.chunkBytes;
+      }
+
+      private void releaseHeapBodyBuffer() {
+         this.chunkBytes = null;
       }
 
       public boolean deliver() throws Exception {
@@ -1207,7 +1223,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
                   logger.trace(this + "::FlowControl::delivery largeMessage interrupting as there are no more credits, available=" +
                                   availableCredits);
                }
-
+               releaseHeapBodyBuffer();
                return false;
             }
 
@@ -1223,7 +1239,11 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
                int packetSize = callback.sendLargeMessage(ref, currentLargeMessage, ServerConsumerImpl.this, context.getLargeBodySize(), ref.getDeliveryCount());
 
                if (availableCredits != null) {
-                  availableCredits.addAndGet(-packetSize);
+                  final int credits = availableCredits.addAndGet(-packetSize);
+
+                  if (credits <= 0) {
+                     releaseHeapBodyBuffer();
+                  }
 
                   if (logger.isTraceEnabled()) {
                      logger.trace(this + "::FlowControl::" +
@@ -1246,32 +1266,38 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
                      logger.trace(this + "::FlowControl::deliverLargeMessage Leaving loop of send LargeMessage because of credits, available=" +
                                      availableCredits);
                   }
-
+                  releaseHeapBodyBuffer();
                   return false;
                }
 
-               int localChunkLen = 0;
+               final int localChunkLen = (int) Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize);
 
-               localChunkLen = (int) Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize);
+               final ByteBuffer bodyBuffer = acquireHeapBodyBuffer(localChunkLen);
 
-               ActiveMQBuffer bodyBuffer = ActiveMQBuffers.fixedBuffer(localChunkLen);
+               assert bodyBuffer.remaining() == localChunkLen;
 
-               context.encode(bodyBuffer, localChunkLen);
+               final int readBytes = context.encode(bodyBuffer);
 
-               byte[] body;
+               assert readBytes == localChunkLen;
 
-               if (bodyBuffer.toByteBuffer().hasArray()) {
-                  body = bodyBuffer.toByteBuffer().array();
-               } else {
-                  body = new byte[0];
-               }
+               final byte[] body = bodyBuffer.array();
+
+               assert body.length == readBytes;
+
+               //It is possible to recycle the same heap body buffer because it won't be cached by sendLargeMessageContinuation
+               //given that requiresResponse is false: ChannelImpl::send will use the resend cache only if
+               //resendCache != null && packet.isRequiresConfirmations()
 
                int packetSize = callback.sendLargeMessageContinuation(ServerConsumerImpl.this, body, positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage, false);
 
                int chunkLen = body.length;
 
                if (availableCredits != null) {
-                  availableCredits.addAndGet(-packetSize);
+                  final int credits = availableCredits.addAndGet(-packetSize);
+
+                  if (credits <= 0) {
+                     releaseHeapBodyBuffer();
+                  }
 
                   if (logger.isTraceEnabled()) {
                      logger.trace(this + "::FlowControl::largeMessage deliver continuation, packetSize=" +
@@ -1304,6 +1330,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
       public void finish() throws Exception {
          synchronized (lock) {
+            releaseHeapBodyBuffer();
+
             if (largeMessage == null) {
                // handleClose could be calling close while handle is also calling finish.
                // As a result one of them could get here after the largeMessage is already gone.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e002dfb3/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index bd36659..2e3e2f9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -1398,13 +1398,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
    private LargeServerMessage messageToLargeMessage(Message message) throws Exception {
       ICoreMessage coreMessage = message.toCore();
       LargeServerMessage lsm = getStorageManager().createLargeMessage(storageManager.generateID(), coreMessage);
-
       ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer();
-      byte[] body = new byte[buffer.readableBytes()];
-      buffer.readBytes(body);
-      lsm.addBytes(body);
+      final int readableBytes = buffer.readableBytes();
+      lsm.addBytes(buffer);
       lsm.releaseResources();
-      lsm.putLongProperty(Message.HDR_LARGE_BODY_SIZE, body.length);
+      lsm.putLongProperty(Message.HDR_LARGE_BODY_SIZE, readableBytes);
       return lsm;
    }