You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/10/22 15:33:13 UTC

[1/2] activemq-artemis git commit: ARTEMIS-1996 MappedSequentialFileFactory may cause DirectByteBuffer off-heap memory leaks

Repository: activemq-artemis
Updated Branches:
  refs/heads/master ef7ff38de -> 5269df852


ARTEMIS-1996 MappedSequentialFileFactory may cause DirectByteBuffer off-heap memory leaks

Compaction is now reusing direct ByteBuffers on both
reading and writing with explicit and deterministic
release to avoid high peak of native memory utilisation
after compaction.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2967df6a
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2967df6a
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2967df6a

Branch: refs/heads/master
Commit: 2967df6a998b93cf471c39e25faa6d3a4c821ae0
Parents: ef7ff38
Author: Francesco Nigro <ni...@gmail.com>
Authored: Wed Aug 15 14:39:41 2018 +0200
Committer: Francesco Nigro <ni...@gmail.com>
Committed: Mon Oct 22 09:16:15 2018 +0200

----------------------------------------------------------------------
 .../jdbc/store/file/JDBCSequentialFile.java     |   5 +
 .../artemis/core/io/SequentialFile.java         |  14 ++
 .../artemis/core/io/aio/AIOSequentialFile.java  |  35 ++++-
 .../core/io/aio/AIOSequentialFileFactory.java   |   9 +-
 .../core/io/mapped/MappedSequentialFile.java    |  22 +++
 .../core/io/mapped/TimedSequentialFile.java     |   5 +
 .../artemis/core/io/nio/NIOSequentialFile.java  |  23 ++-
 .../journal/impl/AbstractJournalUpdateTask.java |  85 ++++++----
 .../artemis/core/journal/impl/JournalImpl.java  | 155 ++++++++++++-------
 .../impl/fakes/FakeSequentialFileFactory.java   |  13 ++
 10 files changed, 272 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2967df6a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
index 0b88d9a..8bbf7b9 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
@@ -251,6 +251,11 @@ public class JDBCSequentialFile implements SequentialFile {
    }
 
    @Override
+   public void blockingWriteDirect(ByteBuffer bytes, boolean sync, boolean releaseBuffer) {
+      writeDirect(bytes, sync, null);
+   }
+
+   @Override
    public void writeDirect(ByteBuffer bytes, boolean sync) throws Exception {
       writeDirect(bytes, sync, null);
       // Are we meant to block here?

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2967df6a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFile.java
index 49130e6..b6e4938 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFile.java
@@ -78,6 +78,20 @@ public interface SequentialFile {
    void writeDirect(ByteBuffer bytes, boolean sync) throws Exception;
 
    /**
+    * Write directly to the file without using any intermediate buffer and wait completion.<br>
+    * If {@code releaseBuffer} is {@code true} the provided {@code bytes} should be released
+    * through {@link SequentialFileFactory#releaseBuffer(ByteBuffer)}, if supported.
+    *
+    * @param bytes         the ByteBuffer must be compatible with the SequentialFile implementation (AIO or
+    *                      NIO). If {@code releaseBuffer} is {@code true} use a buffer from
+    *                      {@link SequentialFileFactory#newBuffer(int)}, {@link SequentialFileFactory#allocateDirectBuffer(int)}
+    *                      otherwise.
+    * @param sync          if {@code true} will durable flush the written data on the file, {@code false} otherwise
+    * @param releaseBuffer if {@code true} will release the buffer, {@code false} otherwise
+    */
+   void blockingWriteDirect(ByteBuffer bytes, boolean sync, boolean releaseBuffer) throws Exception;
+
+   /**
     * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or
     *              NIO). To be safe, use a buffer from the corresponding
     *              {@link SequentialFileFactory#newBuffer(int)}.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2967df6a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
index 139b236..bc3c408 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
@@ -233,6 +233,33 @@ public class AIOSequentialFile extends AbstractSequentialFile {
       }
    }
 
+   @Override
+   public void blockingWriteDirect(ByteBuffer bytes,boolean sync, boolean releaseBuffer) throws Exception {
+      if (logger.isTraceEnabled()) {
+         logger.trace("Write Direct, Sync: true File: " + getFileName());
+      }
+
+      final SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
+
+      try {
+         checkOpened();
+      } catch (Exception e) {
+         ActiveMQJournalLogger.LOGGER.warn(e.getMessage(), e);
+         completion.onError(-1, e.getMessage());
+         return;
+      }
+
+      final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
+
+      final long positionToWrite = position.getAndAdd(bytesToWrite);
+
+      final AIOSequentialFileFactory.AIOSequentialCallback runnableCallback = getCallback(completion, bytes, releaseBuffer);
+      runnableCallback.initWrite(positionToWrite, bytesToWrite);
+      runnableCallback.run();
+
+      completion.waitCompletion();
+   }
+
    /**
     * Note: Parameter sync is not used on AIO
     */
@@ -256,8 +283,14 @@ public class AIOSequentialFile extends AbstractSequentialFile {
    }
 
    AIOSequentialFileFactory.AIOSequentialCallback getCallback(IOCallback originalCallback, ByteBuffer buffer) {
+      return getCallback(originalCallback, buffer, true);
+   }
+
+   AIOSequentialFileFactory.AIOSequentialCallback getCallback(IOCallback originalCallback,
+                                                              ByteBuffer buffer,
+                                                              boolean releaseBuffer) {
       AIOSequentialFileFactory.AIOSequentialCallback callback = aioFactory.getCallback();
-      callback.init(this.nextWritingSequence.getAndIncrement(), originalCallback, aioFile, this, buffer);
+      callback.init(this.nextWritingSequence.getAndIncrement(), originalCallback, aioFile, this, buffer, releaseBuffer);
       pendingCallbacks.countUp();
       return callback;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2967df6a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
index d8288e6..3cdf9fe 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
@@ -286,7 +286,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
       String errorMessage;
       int errorCode = -1;
       long writeSequence;
-
+      boolean releaseBuffer;
       long position;
       int bytes;
 
@@ -297,6 +297,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
             ", errorMessage='" + errorMessage + '\'' +
             ", errorCode=" + errorCode +
             ", writeSequence=" + writeSequence +
+            ", releaseBuffer=" + releaseBuffer +
             ", position=" + position +
             '}';
       }
@@ -332,7 +333,8 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
                                         IOCallback IOCallback,
                                         LibaioFile libaioFile,
                                         AIOSequentialFile sequentialFile,
-                                        ByteBuffer usedBuffer) {
+                                        ByteBuffer usedBuffer,
+                                        boolean releaseBuffer) {
          this.callback = IOCallback;
          this.sequentialFile = sequentialFile;
          this.error = false;
@@ -340,6 +342,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
          this.libaioFile = libaioFile;
          this.writeSequence = writeSequence;
          this.errorMessage = null;
+         this.releaseBuffer = releaseBuffer;
          return this;
       }
 
@@ -375,7 +378,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
                callback.done();
             }
 
-            if (buffer != null && reuseBuffers) {
+            if (buffer != null && reuseBuffers && releaseBuffer) {
                buffersControl.bufferDone(buffer);
             }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2967df6a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java
index 4c5e23a..efce280 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java
@@ -274,6 +274,28 @@ final class MappedSequentialFile implements SequentialFile {
    }
 
    @Override
+   public void blockingWriteDirect(ByteBuffer bytes, boolean sync, boolean releaseBuffer) throws Exception {
+      try {
+         checkIsOpen();
+         final int position = bytes.position();
+         final int limit = bytes.limit();
+         final int remaining = limit - position;
+         if (remaining > 0) {
+            this.mappedFile.write(bytes, position, remaining);
+            final int newPosition = position + remaining;
+            bytes.position(newPosition);
+            if (factory.isDatasync() && sync) {
+               this.mappedFile.force();
+            }
+         }
+      } finally {
+         if (releaseBuffer) {
+            this.factory.releaseBuffer(bytes);
+         }
+      }
+   }
+
+   @Override
    public int read(ByteBuffer bytes, IOCallback callback) throws IOException {
       if (callback == null) {
          throw new NullPointerException("callback parameter need to be set");

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2967df6a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java
index 8436ed5..e0a877a 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java
@@ -97,6 +97,11 @@ final class TimedSequentialFile implements SequentialFile {
    }
 
    @Override
+   public void blockingWriteDirect(ByteBuffer bytes,boolean sync, boolean releaseBuffer) throws Exception {
+      this.sequentialFile.blockingWriteDirect(bytes, sync, releaseBuffer);
+   }
+
+   @Override
    public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback) throws Exception {
       if (this.timedBuffer != null) {
          this.timedBuffer.addBytes(bytes, sync, callback);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2967df6a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
index 55654b7..5f65b64 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
@@ -244,7 +244,7 @@ public class NIOSequentialFile extends AbstractSequentialFile {
       }
 
       try {
-         internalWrite(bytes, sync, callback);
+         internalWrite(bytes, sync, callback, true);
       } catch (Exception e) {
          callback.onError(ActiveMQExceptionType.GENERIC_EXCEPTION.getCode(), e.getMessage());
       }
@@ -252,7 +252,12 @@ public class NIOSequentialFile extends AbstractSequentialFile {
 
    @Override
    public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception {
-      internalWrite(bytes, sync, null);
+      internalWrite(bytes, sync, null, true);
+   }
+
+   @Override
+   public void blockingWriteDirect(ByteBuffer bytes,boolean sync, boolean releaseBuffer) throws Exception {
+      internalWrite(bytes, sync, null, releaseBuffer);
    }
 
    @Override
@@ -266,7 +271,8 @@ public class NIOSequentialFile extends AbstractSequentialFile {
 
    private void internalWrite(final ByteBuffer bytes,
                               final boolean sync,
-                              final IOCallback callback) throws IOException, ActiveMQIOErrorException, InterruptedException {
+                              final IOCallback callback,
+                              boolean releaseBuffer) throws IOException, ActiveMQIOErrorException, InterruptedException {
       if (!isOpen()) {
          if (callback != null) {
             callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "File not opened");
@@ -279,7 +285,7 @@ public class NIOSequentialFile extends AbstractSequentialFile {
       position.addAndGet(bytes.limit());
 
       try {
-         doInternalWrite(bytes, sync, callback);
+         doInternalWrite(bytes, sync, callback, releaseBuffer);
       } catch (ClosedChannelException e) {
          throw e;
       } catch (IOException e) {
@@ -296,7 +302,8 @@ public class NIOSequentialFile extends AbstractSequentialFile {
     */
    private void doInternalWrite(final ByteBuffer bytes,
                                 final boolean sync,
-                                final IOCallback callback) throws IOException {
+                                final IOCallback callback,
+                                boolean releaseBuffer) throws IOException {
       try {
          channel.write(bytes);
 
@@ -308,8 +315,10 @@ public class NIOSequentialFile extends AbstractSequentialFile {
             callback.done();
          }
       } finally {
-         //release it to recycle the write buffer if big enough
-         this.factory.releaseBuffer(bytes);
+         if (releaseBuffer) {
+            //release it to recycle the write buffer if big enough
+            this.factory.releaseBuffer(bytes);
+         }
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2967df6a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
index 10f5008..8f61f99 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.journal.impl;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
@@ -152,10 +153,11 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
       }
    }
 
-   public static SequentialFile readControlFile(final SequentialFileFactory fileFactory,
+   static SequentialFile readControlFile(final SequentialFileFactory fileFactory,
                                                 final List<String> dataFiles,
                                                 final List<String> newFiles,
-                                                final List<Pair<String, String>> renameFile) throws Exception {
+                                                final List<Pair<String, String>> renameFile,
+                                                final AtomicReference<ByteBuffer> wholeFileBufferRef) throws Exception {
       SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL);
 
       if (controlFile.exists()) {
@@ -163,13 +165,12 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
 
          final ArrayList<RecordInfo> records = new ArrayList<>();
 
-
          JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallbackAbstract() {
             @Override
             public void onReadAddRecord(final RecordInfo info) throws Exception {
                records.add(info);
             }
-         });
+         }, wholeFileBufferRef);
 
          if (records.size() == 0) {
             // the record is damaged
@@ -205,29 +206,48 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
       }
    }
 
+   public static SequentialFile readControlFile(final SequentialFileFactory fileFactory,
+                                                final List<String> dataFiles,
+                                                final List<String> newFiles,
+                                                final List<Pair<String, String>> renameFile) throws Exception {
+      return readControlFile(fileFactory, dataFiles, newFiles, renameFile, null);
+   }
+
+   private void flush(boolean releaseWritingBuffer) throws Exception {
+      if (writingChannel != null) {
+         try {
+            if (sequentialFile.isOpen()) {
+               try {
+                  sequentialFile.position(0);
+
+                  // To Fix the size of the file
+                  writingChannel.writerIndex(writingChannel.capacity());
+
+                  final ByteBuffer byteBuffer = bufferWrite;
+                  final int readerIndex = writingChannel.readerIndex();
+                  byteBuffer.clear().position(readerIndex).limit(readerIndex + writingChannel.readableBytes());
+                  sequentialFile.blockingWriteDirect(byteBuffer, true, false);
+               } finally {
+                  sequentialFile.close();
+                  newDataFiles.add(currentFile);
+               }
+            }
+         } finally {
+            if (releaseWritingBuffer) {
+               //deterministic release of native resources
+               fileFactory.releaseDirectBuffer(bufferWrite);
+               writingChannel = null;
+               bufferWrite = null;
+            }
+         }
+      }
+   }
 
    /**
     * Write pending output into file
     */
    public void flush() throws Exception {
-      if (writingChannel != null) {
-         sequentialFile.position(0);
-
-         // To Fix the size of the file
-         writingChannel.writerIndex(writingChannel.capacity());
-
-         bufferWrite.clear()
-            .position(writingChannel.readerIndex())
-            .limit(writingChannel.readableBytes());
-
-         sequentialFile.writeDirect(bufferWrite, true);
-         sequentialFile.close();
-         newDataFiles.add(currentFile);
-      }
-
-      bufferWrite = null;
-
-      writingChannel = null;
+      flush(true);
    }
 
    public boolean containsRecord(final long id) {
@@ -243,11 +263,7 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
     */
 
    protected void openFile() throws Exception {
-      flush();
-
-      bufferWrite = fileFactory.newBuffer(journal.getFileSize());
-
-      writingChannel = ActiveMQBuffers.wrappedBuffer(bufferWrite);
+      flush(false);
 
       currentFile = filesRepository.openFileCMP();
 
@@ -257,6 +273,21 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
 
       currentFile = new JournalFileImpl(sequentialFile, nextOrderingID++, JournalImpl.FORMAT_VERSION);
 
+      final int fileSize = journal.getFileSize();
+      if (bufferWrite != null && bufferWrite.capacity() < fileSize) {
+         fileFactory.releaseDirectBuffer(bufferWrite);
+         bufferWrite = null;
+         writingChannel = null;
+      }
+      if (bufferWrite == null) {
+         final ByteBuffer bufferWrite = fileFactory.allocateDirectBuffer(fileSize);
+         this.bufferWrite = bufferWrite;
+         writingChannel = ActiveMQBuffers.wrappedBuffer(bufferWrite);
+      } else {
+         writingChannel.clear();
+         bufferWrite.clear();
+      }
+
       JournalImpl.writeHeader(writingChannel, journal.getUserVersion(), currentFile.getFileID());
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2967df6a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index 47bdc5b..fe08ed8 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -17,7 +17,6 @@
 package org.apache.activemq.artemis.core.journal.impl;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
@@ -42,6 +41,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -461,17 +461,34 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
       // Now order them by ordering id - we can't use the file name for ordering
       // since we can re-use dataFiles
 
-      Collections.sort(orderedFiles, new JournalFileComparator());
+      Collections.sort(orderedFiles, JOURNAL_FILE_COMPARATOR);
 
       return orderedFiles;
    }
 
-   /**
-    * this method is used internally only however tools may use it to maintenance.
-    */
-   public static int readJournalFile(final SequentialFileFactory fileFactory,
-                                     final JournalFile file,
-                                     final JournalReaderCallback reader) throws Exception {
+   private static ByteBuffer allocateDirectBufferIfNeeded(final SequentialFileFactory fileFactory,
+                                                          final int requiredCapacity,
+                                                          final AtomicReference<ByteBuffer> bufferRef) {
+      ByteBuffer buffer = bufferRef != null ? bufferRef.get() : null;
+      if (buffer != null && buffer.capacity() < requiredCapacity) {
+         fileFactory.releaseDirectBuffer(buffer);
+         buffer = null;
+      }
+      if (buffer == null) {
+         buffer = fileFactory.allocateDirectBuffer(requiredCapacity);
+      } else {
+         buffer.clear().limit(requiredCapacity);
+      }
+      if (bufferRef != null) {
+         bufferRef.lazySet(buffer);
+      }
+      return buffer;
+   }
+
+   static int readJournalFile(final SequentialFileFactory fileFactory,
+                              final JournalFile file,
+                              final JournalReaderCallback reader,
+                              final AtomicReference<ByteBuffer> wholeFileBufferReference) throws Exception {
       file.getFile().open(1, false);
       ByteBuffer wholeFileBuffer = null;
       try {
@@ -481,8 +498,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
             // the file is damaged or the system crash before it was able to write
             return -1;
          }
-
-         wholeFileBuffer = fileFactory.newBuffer(filesize);
+         wholeFileBuffer = allocateDirectBufferIfNeeded(fileFactory, filesize, wholeFileBufferReference);
 
          final int journalFileSize = file.getFile().read(wholeFileBuffer);
 
@@ -771,16 +787,14 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
             lastDataPos = wholeFileBuffer.position();
 
          }
-
          return lastDataPos;
       } catch (Throwable e) {
          ActiveMQJournalLogger.LOGGER.errorReadingFile(e);
          throw new Exception(e.getMessage(), e);
       } finally {
-         if (wholeFileBuffer != null) {
-            fileFactory.releaseBuffer(wholeFileBuffer);
+         if (wholeFileBufferReference == null && wholeFileBuffer != null) {
+            fileFactory.releaseDirectBuffer(wholeFileBuffer);
          }
-
          try {
             file.getFile().close();
          } catch (Throwable ignored) {
@@ -788,6 +802,15 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
       }
    }
 
+   /**
+    * this method is used internally only however tools may use it to maintenance.
+    */
+   public static int readJournalFile(final SequentialFileFactory fileFactory,
+                                     final JournalFile file,
+                                     final JournalReaderCallback reader) throws Exception {
+      return readJournalFile(fileFactory, file, reader, null);
+   }
+
    // Journal implementation
    // ----------------------------------------------------------------
 
@@ -1594,19 +1617,28 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                journalLock.writeLock().unlock();
             }
 
-            Collections.sort(dataFilesToProcess, new JournalFileComparator());
+            Collections.sort(dataFilesToProcess, JOURNAL_FILE_COMPARATOR);
 
             // This is where most of the work is done, taking most of the time of the compacting routine.
             // Notice there are no locks while this is being done.
 
             // Read the files, and use the JournalCompactor class to create the new outputFiles, and the new collections as
             // well
-            for (final JournalFile file : dataFilesToProcess) {
-               try {
-                  JournalImpl.readJournalFile(fileFactory, file, compactor);
-               } catch (Throwable e) {
-                  ActiveMQJournalLogger.LOGGER.compactReadError(file);
-                  throw new Exception("Error on reading compacting for " + file, e);
+            // this AtomicReference is not used for thread-safety, but just as a reference
+            final AtomicReference<ByteBuffer> wholeFileBufferRef = dataFilesToProcess.isEmpty() ? null : new AtomicReference<>();
+            try {
+               for (final JournalFile file : dataFilesToProcess) {
+                  try {
+                     JournalImpl.readJournalFile(fileFactory, file, compactor, wholeFileBufferRef);
+                  } catch (Throwable e) {
+                     ActiveMQJournalLogger.LOGGER.compactReadError(file);
+                     throw new Exception("Error on reading compacting for " + file, e);
+                  }
+               }
+            } finally {
+               ByteBuffer wholeFileBuffer;
+               if (wholeFileBufferRef != null && (wholeFileBuffer = wholeFileBufferRef.get()) != null) {
+                  fileFactory.releaseDirectBuffer(wholeFileBuffer);
                }
             }
 
@@ -1758,16 +1790,14 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
     */
    private synchronized JournalLoadInformation load(final LoaderCallback loadManager,
                                                     final boolean changeData,
-                                                    final JournalState replicationSync) throws Exception {
-      if (state == JournalState.STOPPED || state == JournalState.LOADED) {
-         throw new IllegalStateException("Journal " + this + " must be in " + JournalState.STARTED + " state, was " +
-                                            state);
-      }
-      if (state == replicationSync) {
-         throw new IllegalStateException("Journal cannot be in state " + JournalState.STARTED);
-      }
+                                                    final JournalState replicationSync,
+                                                    final AtomicReference<ByteBuffer> wholeFileBufferRef) throws Exception {
+      JournalState state;
+      assert (state = this.state) != JournalState.STOPPED &&
+         state != JournalState.LOADED &&
+         state != replicationSync;
 
-      checkControlFile();
+      checkControlFile(wholeFileBufferRef);
 
       records.clear();
 
@@ -1796,7 +1826,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
 
             private void checkID(final long id) {
                if (id > maxID.longValue()) {
-                  maxID.set(id);
+                  maxID.lazySet(id);
                }
             }
 
@@ -1804,7 +1834,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
             public void onReadAddRecord(final RecordInfo info) throws Exception {
                checkID(info.id);
 
-               hasData.set(true);
+               hasData.lazySet(true);
 
                loadManager.addRecord(info);
 
@@ -1815,7 +1845,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
             public void onReadUpdateRecord(final RecordInfo info) throws Exception {
                checkID(info.id);
 
-               hasData.set(true);
+               hasData.lazySet(true);
 
                loadManager.updateRecord(info);
 
@@ -1833,7 +1863,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
 
             @Override
             public void onReadDeleteRecord(final long recordID) throws Exception {
-               hasData.set(true);
+               hasData.lazySet(true);
 
                loadManager.deleteRecord(recordID);
 
@@ -1854,7 +1884,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
 
                checkID(info.id);
 
-               hasData.set(true);
+               hasData.lazySet(true);
 
                TransactionHolder tx = loadTransactions.get(transactionID);
 
@@ -1880,7 +1910,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
 
             @Override
             public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception {
-               hasData.set(true);
+               hasData.lazySet(true);
 
                TransactionHolder tx = loadTransactions.get(transactionID);
 
@@ -1908,7 +1938,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
             public void onReadPrepareRecord(final long transactionID,
                                             final byte[] extraData,
                                             final int numberOfRecords) throws Exception {
-               hasData.set(true);
+               hasData.lazySet(true);
 
                TransactionHolder tx = loadTransactions.get(transactionID);
 
@@ -1981,7 +2011,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                      journalTransaction.forget();
                   }
 
-                  hasData.set(true);
+                  hasData.lazySet(true);
                }
 
             }
@@ -2005,16 +2035,16 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                   // Rollbacks.. We will ignore the data anyway.
                   tnp.rollback(file);
 
-                  hasData.set(true);
+                  hasData.lazySet(true);
                }
             }
 
             @Override
             public void markAsDataFile(final JournalFile file) {
-               hasData.set(true);
+               hasData.lazySet(true);
             }
 
-         });
+         }, wholeFileBufferRef);
 
          if (hasData.get()) {
             lastDataPos = resultLastPost;
@@ -2050,7 +2080,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
          } else {
             for (RecordInfo info : transaction.recordInfos) {
                if (info.id > maxID.get()) {
-                  maxID.set(info.id);
+                  maxID.lazySet(info.id);
                }
             }
 
@@ -2069,6 +2099,30 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
       return new JournalLoadInformation(records.size(), maxID.longValue());
    }
 
+   private synchronized JournalLoadInformation load(final LoaderCallback loadManager,
+                                                    final boolean changeData,
+                                                    final JournalState replicationSync) throws Exception {
+      final JournalState state = this.state;
+      if (state == JournalState.STOPPED || state == JournalState.LOADED) {
+         throw new IllegalStateException("Journal " + this + " must be in " + JournalState.STARTED + " state, was " +
+                                            state);
+      }
+      if (state == replicationSync) {
+         throw new IllegalStateException("Journal cannot be in state " + JournalState.STARTED);
+      }
+      // AtomicReference is used only as a reference, not as an Atomic value
+      final AtomicReference<ByteBuffer> wholeFileBufferRef = new AtomicReference<>();
+      try {
+         return load(loadManager, changeData, replicationSync, wholeFileBufferRef);
+      } finally {
+         final ByteBuffer wholeFileBuffer = wholeFileBufferRef.get();
+         if (wholeFileBuffer != null) {
+            fileFactory.releaseDirectBuffer(wholeFileBuffer);
+            wholeFileBufferRef.lazySet(null);
+         }
+      }
+   }
+
    /**
     * @return true if cleanup was called
     */
@@ -2808,12 +2862,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
    /**
     * @throws Exception
     */
-   private void checkControlFile() throws Exception {
+   private void checkControlFile(AtomicReference<ByteBuffer> wholeFileBufferRef) throws Exception {
       ArrayList<String> dataFiles = new ArrayList<>();
       ArrayList<String> newFiles = new ArrayList<>();
       ArrayList<Pair<String, String>> renames = new ArrayList<>();
 
-      SequentialFile controlFile = AbstractJournalUpdateTask.readControlFile(fileFactory, dataFiles, newFiles, renames);
+      SequentialFile controlFile = AbstractJournalUpdateTask.readControlFile(fileFactory, dataFiles, newFiles, renames, wholeFileBufferRef);
       if (controlFile != null) {
          for (String dataFile : dataFiles) {
             SequentialFile file = fileFactory.createSequentialFile(dataFile);
@@ -2904,18 +2958,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
 
    }
 
-   private static final class JournalFileComparator implements Comparator<JournalFile>, Serializable {
-
-      private static final long serialVersionUID = -6264728973604070321L;
-
-      @Override
-      public int compare(final JournalFile f1, final JournalFile f2) {
-         long id1 = f1.getFileID();
-         long id2 = f2.getFileID();
-
-         return id1 < id2 ? -1 : id1 == id2 ? 0 : 1;
-      }
-   }
+   private static final Comparator<JournalFile> JOURNAL_FILE_COMPARATOR = Comparator.comparingLong(JournalFile::getFileID);
 
    @Override
    public final void synchronizationLock() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2967df6a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
index e05b5d0..155831f 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
@@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
 
 public class FakeSequentialFileFactory implements SequentialFileFactory {
 
@@ -418,6 +419,18 @@ public class FakeSequentialFileFactory implements SequentialFileFactory {
       }
 
       @Override
+      public synchronized void blockingWriteDirect(ByteBuffer bytes,
+                                                   boolean sync,
+                                                   boolean releaseBuffer) throws Exception {
+         SimpleWaitIOCallback callback = new SimpleWaitIOCallback();
+         try {
+            writeDirect(bytes, sync, callback);
+         } finally {
+            callback.waitCompletion();
+         }
+      }
+
+      @Override
       public void sync() throws IOException {
          if (supportsCallback) {
             throw new IllegalStateException("sync is not supported when supportsCallback=true");


[2/2] activemq-artemis git commit: This closes #2250

Posted by cl...@apache.org.
This closes #2250


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/5269df85
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/5269df85
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/5269df85

Branch: refs/heads/master
Commit: 5269df852559d5d85848ff9042fff89e044e3943
Parents: ef7ff38 2967df6
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Oct 22 11:33:07 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Oct 22 11:33:07 2018 -0400

----------------------------------------------------------------------
 .../jdbc/store/file/JDBCSequentialFile.java     |   5 +
 .../artemis/core/io/SequentialFile.java         |  14 ++
 .../artemis/core/io/aio/AIOSequentialFile.java  |  35 ++++-
 .../core/io/aio/AIOSequentialFileFactory.java   |   9 +-
 .../core/io/mapped/MappedSequentialFile.java    |  22 +++
 .../core/io/mapped/TimedSequentialFile.java     |   5 +
 .../artemis/core/io/nio/NIOSequentialFile.java  |  23 ++-
 .../journal/impl/AbstractJournalUpdateTask.java |  85 ++++++----
 .../artemis/core/journal/impl/JournalImpl.java  | 155 ++++++++++++-------
 .../impl/fakes/FakeSequentialFileFactory.java   |  13 ++
 10 files changed, 272 insertions(+), 94 deletions(-)
----------------------------------------------------------------------