You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/10/24 19:05:34 UTC

activemq-artemis git commit: ARTEMIS-1996 MappedSequentialFileFactory may cause DirectByteBuffer off-heap memory leaks

Repository: activemq-artemis
Updated Branches:
  refs/heads/2.6.x 0b0499b8a -> 58f617b3c


ARTEMIS-1996 MappedSequentialFileFactory may cause DirectByteBuffer off-heap memory leaks

Compaction is now reusing direct ByteBuffers on both
reading and writing with explicit and deterministic
release to avoid high peak of native memory utilisation
after compaction.

(cherry picked from commit 2967df6a998b93cf471c39e25faa6d3a4c821ae0)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/58f617b3
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/58f617b3
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/58f617b3

Branch: refs/heads/2.6.x
Commit: 58f617b3c678bf82cc56c801f77995631f44daf7
Parents: 0b0499b
Author: Francesco Nigro <ni...@gmail.com>
Authored: Wed Aug 15 14:39:41 2018 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Oct 24 15:05:24 2018 -0400

----------------------------------------------------------------------
 .../jdbc/store/file/JDBCSequentialFile.java     |   5 +
 .../artemis/core/io/SequentialFile.java         |  14 ++
 .../artemis/core/io/aio/AIOSequentialFile.java  |  35 ++++-
 .../core/io/aio/AIOSequentialFileFactory.java   |   9 +-
 .../core/io/mapped/MappedSequentialFile.java    |  22 +++
 .../core/io/mapped/TimedSequentialFile.java     |   5 +
 .../artemis/core/io/nio/NIOSequentialFile.java  |  23 ++-
 .../journal/impl/AbstractJournalUpdateTask.java |  85 ++++++----
 .../artemis/core/journal/impl/JournalImpl.java  | 155 ++++++++++++-------
 .../impl/fakes/FakeSequentialFileFactory.java   |  13 ++
 10 files changed, 272 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/58f617b3/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
index 0b88d9a..8bbf7b9 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
@@ -251,6 +251,11 @@ public class JDBCSequentialFile implements SequentialFile {
    }
 
    @Override
+   public void blockingWriteDirect(ByteBuffer bytes, boolean sync, boolean releaseBuffer) {
+      writeDirect(bytes, sync, null);
+   }
+
+   @Override
    public void writeDirect(ByteBuffer bytes, boolean sync) throws Exception {
       writeDirect(bytes, sync, null);
       // Are we meant to block here?

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/58f617b3/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFile.java
index 49130e6..b6e4938 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFile.java
@@ -78,6 +78,20 @@ public interface SequentialFile {
    void writeDirect(ByteBuffer bytes, boolean sync) throws Exception;
 
    /**
+    * Write directly to the file without using any intermediate buffer and wait completion.<br>
+    * If {@code releaseBuffer} is {@code true} the provided {@code bytes} should be released
+    * through {@link SequentialFileFactory#releaseBuffer(ByteBuffer)}, if supported.
+    *
+    * @param bytes         the ByteBuffer must be compatible with the SequentialFile implementation (AIO or
+    *                      NIO). If {@code releaseBuffer} is {@code true} use a buffer from
+    *                      {@link SequentialFileFactory#newBuffer(int)}, {@link SequentialFileFactory#allocateDirectBuffer(int)}
+    *                      otherwise.
+    * @param sync          if {@code true} will durable flush the written data on the file, {@code false} otherwise
+    * @param releaseBuffer if {@code true} will release the buffer, {@code false} otherwise
+    */
+   void blockingWriteDirect(ByteBuffer bytes, boolean sync, boolean releaseBuffer) throws Exception;
+
+   /**
     * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or
     *              NIO). To be safe, use a buffer from the corresponding
     *              {@link SequentialFileFactory#newBuffer(int)}.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/58f617b3/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
index 139b236..bc3c408 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
@@ -233,6 +233,33 @@ public class AIOSequentialFile extends AbstractSequentialFile {
       }
    }
 
+   @Override
+   public void blockingWriteDirect(ByteBuffer bytes,boolean sync, boolean releaseBuffer) throws Exception {
+      if (logger.isTraceEnabled()) {
+         logger.trace("Write Direct, Sync: true File: " + getFileName());
+      }
+
+      final SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
+
+      try {
+         checkOpened();
+      } catch (Exception e) {
+         ActiveMQJournalLogger.LOGGER.warn(e.getMessage(), e);
+         completion.onError(-1, e.getMessage());
+         return;
+      }
+
+      final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
+
+      final long positionToWrite = position.getAndAdd(bytesToWrite);
+
+      final AIOSequentialFileFactory.AIOSequentialCallback runnableCallback = getCallback(completion, bytes, releaseBuffer);
+      runnableCallback.initWrite(positionToWrite, bytesToWrite);
+      runnableCallback.run();
+
+      completion.waitCompletion();
+   }
+
    /**
     * Note: Parameter sync is not used on AIO
     */
@@ -256,8 +283,14 @@ public class AIOSequentialFile extends AbstractSequentialFile {
    }
 
    AIOSequentialFileFactory.AIOSequentialCallback getCallback(IOCallback originalCallback, ByteBuffer buffer) {
+      return getCallback(originalCallback, buffer, true);
+   }
+
+   AIOSequentialFileFactory.AIOSequentialCallback getCallback(IOCallback originalCallback,
+                                                              ByteBuffer buffer,
+                                                              boolean releaseBuffer) {
       AIOSequentialFileFactory.AIOSequentialCallback callback = aioFactory.getCallback();
-      callback.init(this.nextWritingSequence.getAndIncrement(), originalCallback, aioFile, this, buffer);
+      callback.init(this.nextWritingSequence.getAndIncrement(), originalCallback, aioFile, this, buffer, releaseBuffer);
       pendingCallbacks.countUp();
       return callback;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/58f617b3/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
index d8288e6..3cdf9fe 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
@@ -286,7 +286,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
       String errorMessage;
       int errorCode = -1;
       long writeSequence;
-
+      boolean releaseBuffer;
       long position;
       int bytes;
 
@@ -297,6 +297,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
             ", errorMessage='" + errorMessage + '\'' +
             ", errorCode=" + errorCode +
             ", writeSequence=" + writeSequence +
+            ", releaseBuffer=" + releaseBuffer +
             ", position=" + position +
             '}';
       }
@@ -332,7 +333,8 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
                                         IOCallback IOCallback,
                                         LibaioFile libaioFile,
                                         AIOSequentialFile sequentialFile,
-                                        ByteBuffer usedBuffer) {
+                                        ByteBuffer usedBuffer,
+                                        boolean releaseBuffer) {
          this.callback = IOCallback;
          this.sequentialFile = sequentialFile;
          this.error = false;
@@ -340,6 +342,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
          this.libaioFile = libaioFile;
          this.writeSequence = writeSequence;
          this.errorMessage = null;
+         this.releaseBuffer = releaseBuffer;
          return this;
       }
 
@@ -375,7 +378,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
                callback.done();
             }
 
-            if (buffer != null && reuseBuffers) {
+            if (buffer != null && reuseBuffers && releaseBuffer) {
                buffersControl.bufferDone(buffer);
             }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/58f617b3/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java
index 4c5e23a..efce280 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java
@@ -274,6 +274,28 @@ final class MappedSequentialFile implements SequentialFile {
    }
 
    @Override
+   public void blockingWriteDirect(ByteBuffer bytes, boolean sync, boolean releaseBuffer) throws Exception {
+      try {
+         checkIsOpen();
+         final int position = bytes.position();
+         final int limit = bytes.limit();
+         final int remaining = limit - position;
+         if (remaining > 0) {
+            this.mappedFile.write(bytes, position, remaining);
+            final int newPosition = position + remaining;
+            bytes.position(newPosition);
+            if (factory.isDatasync() && sync) {
+               this.mappedFile.force();
+            }
+         }
+      } finally {
+         if (releaseBuffer) {
+            this.factory.releaseBuffer(bytes);
+         }
+      }
+   }
+
+   @Override
    public int read(ByteBuffer bytes, IOCallback callback) throws IOException {
       if (callback == null) {
          throw new NullPointerException("callback parameter need to be set");

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/58f617b3/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java
index 8436ed5..e0a877a 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java
@@ -97,6 +97,11 @@ final class TimedSequentialFile implements SequentialFile {
    }
 
    @Override
+   public void blockingWriteDirect(ByteBuffer bytes,boolean sync, boolean releaseBuffer) throws Exception {
+      this.sequentialFile.blockingWriteDirect(bytes, sync, releaseBuffer);
+   }
+
+   @Override
    public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback) throws Exception {
       if (this.timedBuffer != null) {
          this.timedBuffer.addBytes(bytes, sync, callback);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/58f617b3/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
index 55654b7..5f65b64 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
@@ -244,7 +244,7 @@ public class NIOSequentialFile extends AbstractSequentialFile {
       }
 
       try {
-         internalWrite(bytes, sync, callback);
+         internalWrite(bytes, sync, callback, true);
       } catch (Exception e) {
          callback.onError(ActiveMQExceptionType.GENERIC_EXCEPTION.getCode(), e.getMessage());
       }
@@ -252,7 +252,12 @@ public class NIOSequentialFile extends AbstractSequentialFile {
 
    @Override
    public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception {
-      internalWrite(bytes, sync, null);
+      internalWrite(bytes, sync, null, true);
+   }
+
+   @Override
+   public void blockingWriteDirect(ByteBuffer bytes,boolean sync, boolean releaseBuffer) throws Exception {
+      internalWrite(bytes, sync, null, releaseBuffer);
    }
 
    @Override
@@ -266,7 +271,8 @@ public class NIOSequentialFile extends AbstractSequentialFile {
 
    private void internalWrite(final ByteBuffer bytes,
                               final boolean sync,
-                              final IOCallback callback) throws IOException, ActiveMQIOErrorException, InterruptedException {
+                              final IOCallback callback,
+                              boolean releaseBuffer) throws IOException, ActiveMQIOErrorException, InterruptedException {
       if (!isOpen()) {
          if (callback != null) {
             callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "File not opened");
@@ -279,7 +285,7 @@ public class NIOSequentialFile extends AbstractSequentialFile {
       position.addAndGet(bytes.limit());
 
       try {
-         doInternalWrite(bytes, sync, callback);
+         doInternalWrite(bytes, sync, callback, releaseBuffer);
       } catch (ClosedChannelException e) {
          throw e;
       } catch (IOException e) {
@@ -296,7 +302,8 @@ public class NIOSequentialFile extends AbstractSequentialFile {
     */
    private void doInternalWrite(final ByteBuffer bytes,
                                 final boolean sync,
-                                final IOCallback callback) throws IOException {
+                                final IOCallback callback,
+                                boolean releaseBuffer) throws IOException {
       try {
          channel.write(bytes);
 
@@ -308,8 +315,10 @@ public class NIOSequentialFile extends AbstractSequentialFile {
             callback.done();
          }
       } finally {
-         //release it to recycle the write buffer if big enough
-         this.factory.releaseBuffer(bytes);
+         if (releaseBuffer) {
+            //release it to recycle the write buffer if big enough
+            this.factory.releaseBuffer(bytes);
+         }
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/58f617b3/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
index 10f5008..8f61f99 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.journal.impl;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
@@ -152,10 +153,11 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
       }
    }
 
-   public static SequentialFile readControlFile(final SequentialFileFactory fileFactory,
+   static SequentialFile readControlFile(final SequentialFileFactory fileFactory,
                                                 final List<String> dataFiles,
                                                 final List<String> newFiles,
-                                                final List<Pair<String, String>> renameFile) throws Exception {
+                                                final List<Pair<String, String>> renameFile,
+                                                final AtomicReference<ByteBuffer> wholeFileBufferRef) throws Exception {
       SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL);
 
       if (controlFile.exists()) {
@@ -163,13 +165,12 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
 
          final ArrayList<RecordInfo> records = new ArrayList<>();
 
-
          JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallbackAbstract() {
             @Override
             public void onReadAddRecord(final RecordInfo info) throws Exception {
                records.add(info);
             }
-         });
+         }, wholeFileBufferRef);
 
          if (records.size() == 0) {
             // the record is damaged
@@ -205,29 +206,48 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
       }
    }
 
+   public static SequentialFile readControlFile(final SequentialFileFactory fileFactory,
+                                                final List<String> dataFiles,
+                                                final List<String> newFiles,
+                                                final List<Pair<String, String>> renameFile) throws Exception {
+      return readControlFile(fileFactory, dataFiles, newFiles, renameFile, null);
+   }
+
+   private void flush(boolean releaseWritingBuffer) throws Exception {
+      if (writingChannel != null) {
+         try {
+            if (sequentialFile.isOpen()) {
+               try {
+                  sequentialFile.position(0);
+
+                  // To Fix the size of the file
+                  writingChannel.writerIndex(writingChannel.capacity());
+
+                  final ByteBuffer byteBuffer = bufferWrite;
+                  final int readerIndex = writingChannel.readerIndex();
+                  byteBuffer.clear().position(readerIndex).limit(readerIndex + writingChannel.readableBytes());
+                  sequentialFile.blockingWriteDirect(byteBuffer, true, false);
+               } finally {
+                  sequentialFile.close();
+                  newDataFiles.add(currentFile);
+               }
+            }
+         } finally {
+            if (releaseWritingBuffer) {
+               //deterministic release of native resources
+               fileFactory.releaseDirectBuffer(bufferWrite);
+               writingChannel = null;
+               bufferWrite = null;
+            }
+         }
+      }
+   }
 
    /**
     * Write pending output into file
     */
    public void flush() throws Exception {
-      if (writingChannel != null) {
-         sequentialFile.position(0);
-
-         // To Fix the size of the file
-         writingChannel.writerIndex(writingChannel.capacity());
-
-         bufferWrite.clear()
-            .position(writingChannel.readerIndex())
-            .limit(writingChannel.readableBytes());
-
-         sequentialFile.writeDirect(bufferWrite, true);
-         sequentialFile.close();
-         newDataFiles.add(currentFile);
-      }
-
-      bufferWrite = null;
-
-      writingChannel = null;
+      flush(true);
    }
 
    public boolean containsRecord(final long id) {
@@ -243,11 +263,7 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
     */
 
    protected void openFile() throws Exception {
-      flush();
-
-      bufferWrite = fileFactory.newBuffer(journal.getFileSize());
-
-      writingChannel = ActiveMQBuffers.wrappedBuffer(bufferWrite);
+      flush(false);
 
       currentFile = filesRepository.openFileCMP();
 
@@ -257,6 +273,21 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
 
       currentFile = new JournalFileImpl(sequentialFile, nextOrderingID++, JournalImpl.FORMAT_VERSION);
 
+      final int fileSize = journal.getFileSize();
+      if (bufferWrite != null && bufferWrite.capacity() < fileSize) {
+         fileFactory.releaseDirectBuffer(bufferWrite);
+         bufferWrite = null;
+         writingChannel = null;
+      }
+      if (bufferWrite == null) {
+         final ByteBuffer bufferWrite = fileFactory.allocateDirectBuffer(fileSize);
+         this.bufferWrite = bufferWrite;
+         writingChannel = ActiveMQBuffers.wrappedBuffer(bufferWrite);
+      } else {
+         writingChannel.clear();
+         bufferWrite.clear();
+      }
+
       JournalImpl.writeHeader(writingChannel, journal.getUserVersion(), currentFile.getFileID());
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/58f617b3/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index 47bdc5b..fe08ed8 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -17,7 +17,6 @@
 package org.apache.activemq.artemis.core.journal.impl;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
@@ -42,6 +41,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -461,17 +461,34 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
       // Now order them by ordering id - we can't use the file name for ordering
       // since we can re-use dataFiles
 
-      Collections.sort(orderedFiles, new JournalFileComparator());
+      Collections.sort(orderedFiles, JOURNAL_FILE_COMPARATOR);
 
       return orderedFiles;
    }
 
-   /**
-    * this method is used internally only however tools may use it to maintenance.
-    */
-   public static int readJournalFile(final SequentialFileFactory fileFactory,
-                                     final JournalFile file,
-                                     final JournalReaderCallback reader) throws Exception {
+   private static ByteBuffer allocateDirectBufferIfNeeded(final SequentialFileFactory fileFactory,
+                                                          final int requiredCapacity,
+                                                          final AtomicReference<ByteBuffer> bufferRef) {
+      ByteBuffer buffer = bufferRef != null ? bufferRef.get() : null;
+      if (buffer != null && buffer.capacity() < requiredCapacity) {
+         fileFactory.releaseDirectBuffer(buffer);
+         buffer = null;
+      }
+      if (buffer == null) {
+         buffer = fileFactory.allocateDirectBuffer(requiredCapacity);
+      } else {
+         buffer.clear().limit(requiredCapacity);
+      }
+      if (bufferRef != null) {
+         bufferRef.lazySet(buffer);
+      }
+      return buffer;
+   }
+
+   static int readJournalFile(final SequentialFileFactory fileFactory,
+                              final JournalFile file,
+                              final JournalReaderCallback reader,
+                              final AtomicReference<ByteBuffer> wholeFileBufferReference) throws Exception {
       file.getFile().open(1, false);
       ByteBuffer wholeFileBuffer = null;
       try {
@@ -481,8 +498,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
             // the file is damaged or the system crash before it was able to write
             return -1;
          }
-
-         wholeFileBuffer = fileFactory.newBuffer(filesize);
+         wholeFileBuffer = allocateDirectBufferIfNeeded(fileFactory, filesize, wholeFileBufferReference);
 
          final int journalFileSize = file.getFile().read(wholeFileBuffer);
 
@@ -771,16 +787,14 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
             lastDataPos = wholeFileBuffer.position();
 
          }
-
          return lastDataPos;
       } catch (Throwable e) {
          ActiveMQJournalLogger.LOGGER.errorReadingFile(e);
          throw new Exception(e.getMessage(), e);
       } finally {
-         if (wholeFileBuffer != null) {
-            fileFactory.releaseBuffer(wholeFileBuffer);
+         if (wholeFileBufferReference == null && wholeFileBuffer != null) {
+            fileFactory.releaseDirectBuffer(wholeFileBuffer);
          }
-
          try {
             file.getFile().close();
          } catch (Throwable ignored) {
@@ -788,6 +802,15 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
       }
    }
 
+   /**
+    * this method is used internally only however tools may use it to maintenance.
+    */
+   public static int readJournalFile(final SequentialFileFactory fileFactory,
+                                     final JournalFile file,
+                                     final JournalReaderCallback reader) throws Exception {
+      return readJournalFile(fileFactory, file, reader, null);
+   }
+
    // Journal implementation
    // ----------------------------------------------------------------
 
@@ -1594,19 +1617,28 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                journalLock.writeLock().unlock();
             }
 
-            Collections.sort(dataFilesToProcess, new JournalFileComparator());
+            Collections.sort(dataFilesToProcess, JOURNAL_FILE_COMPARATOR);
 
             // This is where most of the work is done, taking most of the time of the compacting routine.
             // Notice there are no locks while this is being done.
 
             // Read the files, and use the JournalCompactor class to create the new outputFiles, and the new collections as
             // well
-            for (final JournalFile file : dataFilesToProcess) {
-               try {
-                  JournalImpl.readJournalFile(fileFactory, file, compactor);
-               } catch (Throwable e) {
-                  ActiveMQJournalLogger.LOGGER.compactReadError(file);
-                  throw new Exception("Error on reading compacting for " + file, e);
+            // this AtomicReference is not used for thread-safety, but just as a reference
+            final AtomicReference<ByteBuffer> wholeFileBufferRef = dataFilesToProcess.isEmpty() ? null : new AtomicReference<>();
+            try {
+               for (final JournalFile file : dataFilesToProcess) {
+                  try {
+                     JournalImpl.readJournalFile(fileFactory, file, compactor, wholeFileBufferRef);
+                  } catch (Throwable e) {
+                     ActiveMQJournalLogger.LOGGER.compactReadError(file);
+                     throw new Exception("Error on reading compacting for " + file, e);
+                  }
+               }
+            } finally {
+               ByteBuffer wholeFileBuffer;
+               if (wholeFileBufferRef != null && (wholeFileBuffer = wholeFileBufferRef.get()) != null) {
+                  fileFactory.releaseDirectBuffer(wholeFileBuffer);
                }
             }
 
@@ -1758,16 +1790,14 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
     */
    private synchronized JournalLoadInformation load(final LoaderCallback loadManager,
                                                     final boolean changeData,
-                                                    final JournalState replicationSync) throws Exception {
-      if (state == JournalState.STOPPED || state == JournalState.LOADED) {
-         throw new IllegalStateException("Journal " + this + " must be in " + JournalState.STARTED + " state, was " +
-                                            state);
-      }
-      if (state == replicationSync) {
-         throw new IllegalStateException("Journal cannot be in state " + JournalState.STARTED);
-      }
+                                                    final JournalState replicationSync,
+                                                    final AtomicReference<ByteBuffer> wholeFileBufferRef) throws Exception {
+      JournalState state;
+      assert (state = this.state) != JournalState.STOPPED &&
+         state != JournalState.LOADED &&
+         state != replicationSync;
 
-      checkControlFile();
+      checkControlFile(wholeFileBufferRef);
 
       records.clear();
 
@@ -1796,7 +1826,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
 
             private void checkID(final long id) {
                if (id > maxID.longValue()) {
-                  maxID.set(id);
+                  maxID.lazySet(id);
                }
             }
 
@@ -1804,7 +1834,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
             public void onReadAddRecord(final RecordInfo info) throws Exception {
                checkID(info.id);
 
-               hasData.set(true);
+               hasData.lazySet(true);
 
                loadManager.addRecord(info);
 
@@ -1815,7 +1845,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
             public void onReadUpdateRecord(final RecordInfo info) throws Exception {
                checkID(info.id);
 
-               hasData.set(true);
+               hasData.lazySet(true);
 
                loadManager.updateRecord(info);
 
@@ -1833,7 +1863,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
 
             @Override
             public void onReadDeleteRecord(final long recordID) throws Exception {
-               hasData.set(true);
+               hasData.lazySet(true);
 
                loadManager.deleteRecord(recordID);
 
@@ -1854,7 +1884,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
 
                checkID(info.id);
 
-               hasData.set(true);
+               hasData.lazySet(true);
 
                TransactionHolder tx = loadTransactions.get(transactionID);
 
@@ -1880,7 +1910,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
 
             @Override
             public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception {
-               hasData.set(true);
+               hasData.lazySet(true);
 
                TransactionHolder tx = loadTransactions.get(transactionID);
 
@@ -1908,7 +1938,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
             public void onReadPrepareRecord(final long transactionID,
                                             final byte[] extraData,
                                             final int numberOfRecords) throws Exception {
-               hasData.set(true);
+               hasData.lazySet(true);
 
                TransactionHolder tx = loadTransactions.get(transactionID);
 
@@ -1981,7 +2011,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                      journalTransaction.forget();
                   }
 
-                  hasData.set(true);
+                  hasData.lazySet(true);
                }
 
             }
@@ -2005,16 +2035,16 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                   // Rollbacks.. We will ignore the data anyway.
                   tnp.rollback(file);
 
-                  hasData.set(true);
+                  hasData.lazySet(true);
                }
             }
 
             @Override
             public void markAsDataFile(final JournalFile file) {
-               hasData.set(true);
+               hasData.lazySet(true);
             }
 
-         });
+         }, wholeFileBufferRef);
 
          if (hasData.get()) {
             lastDataPos = resultLastPost;
@@ -2050,7 +2080,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
          } else {
             for (RecordInfo info : transaction.recordInfos) {
                if (info.id > maxID.get()) {
-                  maxID.set(info.id);
+                  maxID.lazySet(info.id);
                }
             }
 
@@ -2069,6 +2099,30 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
       return new JournalLoadInformation(records.size(), maxID.longValue());
    }
 
+   private synchronized JournalLoadInformation load(final LoaderCallback loadManager,
+                                                    final boolean changeData,
+                                                    final JournalState replicationSync) throws Exception {
+      final JournalState state = this.state;
+      if (state == JournalState.STOPPED || state == JournalState.LOADED) {
+         throw new IllegalStateException("Journal " + this + " must be in " + JournalState.STARTED + " state, was " +
+                                            state);
+      }
+      if (state == replicationSync) {
+         throw new IllegalStateException("Journal cannot be in state " + JournalState.STARTED);
+      }
+      // AtomicReference is used only as a reference, not as an Atomic value
+      final AtomicReference<ByteBuffer> wholeFileBufferRef = new AtomicReference<>();
+      try {
+         return load(loadManager, changeData, replicationSync, wholeFileBufferRef);
+      } finally {
+         final ByteBuffer wholeFileBuffer = wholeFileBufferRef.get();
+         if (wholeFileBuffer != null) {
+            fileFactory.releaseDirectBuffer(wholeFileBuffer);
+            wholeFileBufferRef.lazySet(null);
+         }
+      }
+   }
+
    /**
     * @return true if cleanup was called
     */
@@ -2808,12 +2862,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
    /**
     * @throws Exception
     */
-   private void checkControlFile() throws Exception {
+   private void checkControlFile(AtomicReference<ByteBuffer> wholeFileBufferRef) throws Exception {
       ArrayList<String> dataFiles = new ArrayList<>();
       ArrayList<String> newFiles = new ArrayList<>();
       ArrayList<Pair<String, String>> renames = new ArrayList<>();
 
-      SequentialFile controlFile = AbstractJournalUpdateTask.readControlFile(fileFactory, dataFiles, newFiles, renames);
+      SequentialFile controlFile = AbstractJournalUpdateTask.readControlFile(fileFactory, dataFiles, newFiles, renames, wholeFileBufferRef);
       if (controlFile != null) {
          for (String dataFile : dataFiles) {
             SequentialFile file = fileFactory.createSequentialFile(dataFile);
@@ -2904,18 +2958,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
 
    }
 
-   private static final class JournalFileComparator implements Comparator<JournalFile>, Serializable {
-
-      private static final long serialVersionUID = -6264728973604070321L;
-
-      @Override
-      public int compare(final JournalFile f1, final JournalFile f2) {
-         long id1 = f1.getFileID();
-         long id2 = f2.getFileID();
-
-         return id1 < id2 ? -1 : id1 == id2 ? 0 : 1;
-      }
-   }
+   private static final Comparator<JournalFile> JOURNAL_FILE_COMPARATOR = Comparator.comparingLong(JournalFile::getFileID);
 
    @Override
    public final void synchronizationLock() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/58f617b3/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
index e05b5d0..155831f 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
@@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
 
 public class FakeSequentialFileFactory implements SequentialFileFactory {
 
@@ -418,6 +419,18 @@ public class FakeSequentialFileFactory implements SequentialFileFactory {
       }
 
       @Override
+      public synchronized void blockingWriteDirect(ByteBuffer bytes,
+                                                   boolean sync,
+                                                   boolean releaseBuffer) throws Exception {
+         SimpleWaitIOCallback callback = new SimpleWaitIOCallback();
+         try {
+            writeDirect(bytes, sync, callback);
+         } finally {
+            callback.waitCompletion();
+         }
+      }
+
+      @Override
       public void sync() throws IOException {
          if (supportsCallback) {
             throw new IllegalStateException("sync is not supported when supportsCallback=true");